In [3]:
import os
import psycopg2
import requests

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col, max

In [4]:
#!pip install psycopg2-binary

In [5]:
# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [6]:
env = os.environ

In [7]:
# Connect to Redshift using psycopg2
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [8]:
cursor = conn.cursor()
cursor.execute(f'''
        CREATE TABLE IF NOT EXISTS {env['AWS_REDSHIFT_SCHEMA']}.finance (
            "date_from" VARCHAR(10),
            "1. open" VARCHAR(10),
            "2. high" VARCHAR(10),
            "3. low" VARCHAR(10),
            "4. close" VARCHAR(10), 
            "5. volume" VARCHAR(10),
            symbol VARCHAR(10) distkey
        ) sortkey(date_from);
    ''')
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [9]:
def extract_data(symbol):
    try:
        url = f'https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol={symbol}&apikey={env["API_KEY"]}'
        response = requests.get(url)
        json_data = response.json()

        data_list = []
        for date, values_dict in json_data["Monthly Time Series"].items():
            data = (date, values_dict["1. open"], values_dict["2. high"], values_dict["3. low"], values_dict["4. close"], values_dict["5. volume"])
            data_list.append(data)

        # Crear el DataFrame con todos los datos
        df = spark.createDataFrame(data_list, ["date_from", "1. open", "2. high", "3. low", "4. close", "5. volume"])
        df = df.withColumn("symbol", lit(symbol))
        return df

    except requests.exceptions.RequestException as e:
        print(f"Error de solicitud: {e}")
        return None

In [10]:
data_ibm = extract_data('IBM')
data_aapl = extract_data('AAPL')
data_tsla = extract_data('TSLA')
data = data_ibm.union(data_aapl).union(data_tsla)

In [11]:
data.printSchema()
data.show()

root
 |-- date_from: string (nullable = true)
 |-- 1. open: string (nullable = true)
 |-- 2. high: string (nullable = true)
 |-- 3. low: string (nullable = true)
 |-- 4. close: string (nullable = true)
 |-- 5. volume: string (nullable = true)
 |-- symbol: string (nullable = false)

+----------+--------+--------+--------+--------+---------+------+
| date_from| 1. open| 2. high|  3. low|4. close|5. volume|symbol|
+----------+--------+--------+--------+--------+---------+------+
|2023-06-12|128.4400|136.6200|127.7800|136.4200| 35186180|   IBM|
|2023-05-31|126.3500|130.0699|120.5500|128.5900| 95710890|   IBM|
|2023-04-28|130.9700|132.6100|124.5600|126.4100| 83664114|   IBM|
|2023-03-31|128.9000|131.4800|121.7100|131.0900|138093084|   IBM|
|2023-02-28|134.4900|137.3900|128.8600|129.3000| 76080679|   IBM|
|2023-01-31|141.1000|147.1800|132.9800|134.7300|105576019|   IBM|
|2022-12-30|149.9800|153.2100|137.1950|140.8900| 86426226|   IBM|
|2022-11-30|138.2500|150.4600|133.9700|148.9000| 93620235

In [12]:
# Verificación de duplicados
# Verificar si el DataFrame tiene duplicados
total_rows = data.count()
distinct_rows = data.dropDuplicates().count()

# Comparar la cantidad de filas antes y después de eliminar los duplicados
if total_rows == distinct_rows:
    print("El DataFrame no tiene duplicados.")
else:
    print("El DataFrame tiene duplicados.")

El DataFrame no tiene duplicados.


In [13]:
data.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.finance") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [14]:
# Query Redshift using Spark SQL
query = f"select distinct symbol from {env['AWS_REDSHIFT_SCHEMA']}.finance"
data = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as tmp_table") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [15]:
data.printSchema()
data.show()

root
 |-- symbol: string (nullable = true)

+------+
|symbol|
+------+
|   IBM|
|  AAPL|
|  TSLA|
+------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 54516)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/