In [1]:
pip install python-dotenv sqlalchemy requests pandas pyspark py4j


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType, TimestampType
import requests
import os
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine
import pandas as pd
import findspark
from pyspark import SparkConf
findspark.init('C:/spark')

In [3]:
# Initialize SparkSession
spark = SparkSession.builder \
            .master("local[*]")\
            .appName("FinancedataETL") \
            .config("spark.jars", "postgresql-42.7.3.jar") \
            .config("spark.driver.extraClassPath",  "postgresql-42.7.3.jar") \
            .config("spark.executor.extraClassPath",  "postgresql-42.7.3.jar") \
            .config("spark.driver.memory", "8g")\
            .config("spark.executor.memory", "8g")\
            .config("spark.network.timeout", "600s")\
            .getOrCreate()



# Verify Spark session
print("PySpark is working correctly with Spark version:", spark.version)

# Stop Spark session
#spark.stop()

PySpark is working correctly with Spark version: 3.4.4


In [4]:
#Knowing the spark UI
spark

In [5]:
# API request function
def fetch_data(symbol, api_key='KA3DQ2SRS8MOUN8J', output_size='full'):
    url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&outputsize={output_size}&apikey={api_key}'
    response = requests.get(url)
    return response.json()

In [6]:

import aiohttp
import asyncio
# Asynchronous request function
async def fetch_data_async(symbol, api_key='KA3DQ2SRS8MOUN8J', output_size='full'):
    url = f'https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol={symbol}&outputsize={output_size}&apikey={api_key}'
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# Fetch data for all symbols
async def fetch_all_data(symbols):
    tasks = [fetch_data_async(symbol) for symbol in symbols]
    return await asyncio.gather(*tasks)


In [7]:
# Define schema for the DataFrame
schema = StructType([
    StructField("date", DateType(), True),
    StructField("daily_open", FloatType(), True),
    StructField("daily_high", FloatType(), True),
    StructField("daily_low", FloatType(), True),
    StructField("daily_close", FloatType(), True),
    StructField("daily_volume", IntegerType(), True),
    StructField("last_refreshed", DateType(), True),
    StructField("output_size", StringType(), True),
    StructField("time_zone", StringType(), True),
    StructField("description", StringType(), True),
    StructField("symbol", StringType(), True),
])

In [8]:

import pandas as pd

# Process data and accumulate in a Pandas DataFrame first
all_rows = []

async def process_data(symbols):
    responses = await fetch_all_data(symbols)
    for symbol, data in zip(symbols, responses):
        meta_data = data.get('Meta Data', {})
        description = meta_data.get('1. Information', '').split(' ')[0] + ' ' + meta_data.get('1. Information', '').split(' ')[1]
        last_refreshed = meta_data.get('3. Last Refreshed', None)
        output_size = meta_data.get('4. Output Size', 'N/A')
        time_zone = meta_data.get('5. Time Zone', 'N/A')
        
        # Extract time series data
        time_series = data.get('Time Series (Daily)', {})
        for date, daily_data in time_series.items():
              # Convert `date` string to `datetime.date` object
            date_obj = datetime.strptime(date, '%Y-%m-%d').date()

            # Check the format of `last_refreshed`
            if last_refreshed and len(last_refreshed) == 10:
                last_refreshed_dt = datetime.strptime(last_refreshed, '%Y-%m-%d')
            elif last_refreshed:
                last_refreshed_dt = datetime.strptime(last_refreshed, '%Y-%m-%d')
            else:
                last_refreshed_dt = None
            all_rows.append({
                'date': date_obj,
                'daily_open': float(daily_data['1. open']),
                'daily_high': float(daily_data['2. high']),
                'daily_low': float(daily_data['3. low']),
                'daily_close': float(daily_data['4. close']),
                'daily_volume': int(daily_data['5. volume']),
                'last_refreshed': last_refreshed_dt,
                'output_size': output_size,
                'time_zone': time_zone,
                'description': description,
                'symbol': symbol
            })

symbols = ['TSCO.LON', 'IBM', 'MBG.DEX', 'SHOP.TRT']
await process_data(symbols)

# Convert to Pandas DataFrame
df_pandas = pd.DataFrame(all_rows)


In [9]:

# If there is data, convert to Spark DataFrame
if not df_pandas.empty:
    df = spark.createDataFrame(df_pandas, schema=schema)
else:
    print("No data found to load into Spark DataFrame.")


In [10]:
#Checking the dataset
#df=df.repartition(4)  


In [11]:
df.cache()

DataFrame[date: date, daily_open: float, daily_high: float, daily_low: float, daily_close: float, daily_volume: int, last_refreshed: date, output_size: string, time_zone: string, description: string, symbol: string]

In [12]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define a 5-day window spec (based on row count instead of date range)
window_spec = Window.orderBy("date").rowsBetween(-4, 0)

# Calculate the 5-day moving average for the "daily_close" column
df = df.withColumn("5_day_moving_avg", F.round(F.avg("daily_close").over(window_spec), 2))

In [None]:
# Selecting specific columns and filter rows 
from pyspark.sql.functions import col
df_filtered = df.select("date","last_refreshed","daily_open","daily_high","daily_low","daily_close", "daily_volume", "5_day_moving_avg","description","symbol") 
df_filtered.show()

+----------+--------------+----------+----------+---------+-----------+------------+----------------+------------+------+
|      date|last_refreshed|daily_open|daily_high|daily_low|daily_close|daily_volume|5_day_moving_avg| description|symbol|
+----------+--------------+----------+----------+---------+-----------+------------+----------------+------------+------+
|1999-12-03|    2024-11-15|    109.81|    112.87|   107.94|     111.87|    14680300|          105.56|Daily Prices|   IBM|
|1999-12-06|    2024-11-15|     113.0|     116.5|    112.5|      116.0|     9928300|          107.92|Daily Prices|   IBM|
|1999-12-07|    2024-11-15|     117.0|    119.19|   115.44|     116.62|    11326100|          110.64|Daily Prices|   IBM|
|1999-12-08|    2024-11-15|    116.25|     121.0|    115.5|     118.28|     8139800|          113.61|Daily Prices|   IBM|
|1999-12-09|    2024-11-15|     120.5|    122.12|   112.25|     113.37|    16643000|          115.23|Daily Prices|   IBM|
|1999-12-21|    2024-11-

In [14]:
df.count()

18768

In [15]:
# Grouping by symbol and calculating the average daily close 
df_grouped = df_filtered.groupBy("symbol").agg(F.round(F.avg("daily_open"), 2).alias("avg_daily_open"), F.round(F.avg("daily_close"), 2).alias("avg_daily_close"), F.round(F.avg("daily_volume"), 2).alias("avg_daily_volume"))

df_grouped.show()

+--------+--------------+---------------+----------------+
|  symbol|avg_daily_open|avg_daily_close|avg_daily_volume|
+--------+--------------+---------------+----------------+
|TSCO.LON|        298.33|          298.2|   2.174187031E7|
|     IBM|        150.59|         150.68|      5141763.07|
|SHOP.TRT|         718.3|         717.61|      3043116.08|
+--------+--------------+---------------+----------------+



In [None]:
from pyspark.sql.functions import col, to_date
from pyspark.sql import SparkSession
from sqlalchemy import create_engine, Integer, Float, String, DateTime, TIMESTAMP
import os
from dotenv import load_dotenv

# Loading environment variables
load_dotenv(override=True)

# Creating a PostgreSQL connection
engine = create_engine(
    'postgresql://{user}:{password}@{host}:{port}/{database}'.format(
        user=os.getenv('PG_USER'),
        password=os.getenv('PG_PASSWORD'),
        host=os.getenv('PG_HOST'),
        port=os.getenv('PG_PORT'),
        database=os.getenv('PG_DATABASE')
    )
)

# Function to get the last inserted date from the database, ensuring incremental loading
def get_last_loaded_date(engine):
    query = "SELECT MAX(date) FROM daily_finance;"
    result = engine.execute(query).fetchone()
    return result[0] if result[0] is not None else None

# Get the last loaded date from the database
last_loaded_date = get_last_loaded_date(engine)

# Convert 'date' column to datetime in PySpark (ensure consistency)
df_filtered = df_filtered.withColumn('date', to_date(col('date')))

# Filters the DataFrame to include only new data (dates greater than the last loaded date)
if last_loaded_date:
    # Convert last_loaded_date to the same format as PySpark date
    from pyspark.sql.functions import lit
    last_loaded_date = to_date(lit(last_loaded_date))

    # Filter for new data (only rows with a 'date' greater than the last loaded date)
    new_data = df_filtered.filter(col('date') > last_loaded_date)

    # Debugging information for date range
    if new_data.count() > 0:
        print("Last loaded date:", last_loaded_date)
        print("New data range:", new_data.agg({"date": "min"}).collect()[0][0], "to", new_data.agg({"date": "max"}).collect()[0][0])
else:
    # If there is no data loaded yet, load all
    new_data = df_filtered

# Check if there is any new data to insert
if new_data.count() > 0:
    # Define JDBC URL and properties for PostgreSQL connection
    jdbc_url = f"jdbc:postgresql://{os.getenv('PG_HOST')}:{os.getenv('PG_PORT')}/{os.getenv('PG_DATABASE')}"
    connection_properties = {
        "user": os.getenv('PG_USER'),
        "password": os.getenv('PG_PASSWORD'),
        "driver": "org.postgresql.Driver"
    }

    # Directly write the new data to PostgreSQL using Spark's JDBC connector
    new_data.write.jdbc(url=jdbc_url, table="daily_finance", mode="append", properties=connection_properties)

    print("New data loaded successfully into the database.")
else:
    print("No new data to load, just historic data.")


Last loaded date: Column<'to_date(TIMESTAMP '2024-11-08 00:00:00')'>
New data range: 2024-11-11 to 2024-11-15
New data loaded successfully into the database.
