In [0]:
# Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime, timedelta
from pyspark.sql.window import Window
import random

# Start a Spark session
spark = SparkSession.builder.appName("DatabricksFinancialDemo").getOrCreate()

In [0]:
# Generate Mock Financial Data for Apple Inc. (AAPL) over the last year
dates = [(datetime.now() - timedelta(days=i)) for i in range(365)]
closing_prices = [random.uniform(120, 150) for _ in range(365)]  # Random closing prices between 120 and 150
volumes = [random.randint(50000000, 200000000) for _ in range(365)]  # Random volumes between 50M and 200M

data = list(zip(dates, closing_prices, volumes))
columns = ["Date", "Close", "Volume"]

df = spark.createDataFrame(data, columns)
df.show()

+--------------------+------------------+---------+
|                Date|             Close|   Volume|
+--------------------+------------------+---------+
|2023-10-20 02:46:...| 125.9914276556439|125827043|
|2023-10-19 02:46:...| 130.2954505655251| 76891314|
|2023-10-18 02:46:...|129.94139201395572|155375990|
|2023-10-17 02:46:...|137.92935379576267|196324605|
|2023-10-16 02:46:...|148.97024916556785|120882182|
|2023-10-15 02:46:...|146.03799697510834|191437670|
|2023-10-14 02:46:...|123.26679129488465| 95653833|
|2023-10-13 02:46:...|141.43950041307306|138182486|
|2023-10-12 02:46:...|149.77764187373316| 58018881|
|2023-10-11 02:46:...| 128.5871930871246| 52605476|
|2023-10-10 02:46:...|120.78438798338215|194187107|
|2023-10-09 02:46:...|126.98117136564747|149013988|
|2023-10-08 02:46:...|120.13397562216726|141042273|
|2023-10-07 02:46:...|139.67989812984766|192899289|
|2023-10-06 02:46:...| 139.5078706382219|192214316|
|2023-10-05 02:46:...| 128.7878121782671| 74512905|
|2023-10-04 

In [0]:
# Basic Manipulations

# Average Closing Price
avg_close = df.agg(avg("Close")).collect()[0][0]
print(f"Mock Average Closing Price of AAPL over the last year: ${avg_close:.2f}")

# Day with the Highest Volume
max_volume_day_row = df.orderBy(desc("Volume")).select("Date").head()
max_volume_day = max_volume_day_row.Date
print(f"Day with the highest mock trading volume for AAPL: {max_volume_day}")

Mock Average Closing Price of AAPL over the last year: $134.94
Day with the highest mock trading volume for AAPL: 2023-04-16 02:46:13.921871


In [0]:
# More Advanced Manipulations

# Add a new column for month and year
df_with_month_year = df.withColumn("Year", year("Date")).withColumn("Month", month("Date"))

# Calculate monthly average closing price
monthly_avg = df_with_month_year.groupBy("Year", "Month").agg(avg("Close").alias("MonthlyAvgClose"))

# Calculate month-over-month percentage change
window_spec = Window.partitionBy("Year").orderBy("Month")
monthly_avg_with_change = monthly_avg.withColumn("PrevMonthClose", lag("MonthlyAvgClose", 1).over(window_spec))
monthly_avg_with_change = monthly_avg_with_change.withColumn("MoMChange", (col("MonthlyAvgClose") - col("PrevMonthClose")) / col("PrevMonthClose") * 100)

monthly_avg_with_change.show()

+----+-----+------------------+------------------+-------------------+
|Year|Month|   MonthlyAvgClose|    PrevMonthClose|          MoMChange|
+----+-----+------------------+------------------+-------------------+
|2022|   10|132.95428697814998|              null|               null|
|2022|   11|  136.179644385601|132.95428697814998|  2.425914561131133|
|2022|   12|134.08902028868337|  136.179644385601|-1.5351957382102515|
|2023|    1|  133.772915256676|              null|               null|
|2023|    2|137.08693019024818|  133.772915256676|  2.477343733754661|
|2023|    3| 135.8383543429674|137.08693019024818|-0.9107913099724578|
|2023|    4|132.78548904093674| 135.8383543429674|-2.2474251228947595|
|2023|    5| 135.1510964152695|132.78548904093674| 1.7815255201594125|
|2023|    6|134.27167567194596| 135.1510964152695|-0.6506944942728501|
|2023|    7|135.39790138196392|134.27167567194596| 0.8387664072723495|
|2023|    8| 134.6266694443306|135.39790138196392|-0.5696040557214024|
|2023|

In [0]:
# Using spark.sql to find the month with the lowest average closing price
result = spark.sql("""
    SELECT Year, Month, MonthlyAvgClose as LowestAvgClose
    FROM (
        SELECT Year, Month, AVG(Close) as MonthlyAvgClose
        FROM apple_financials
        GROUP BY Year, Month
    ) as monthly_data
    ORDER BY MonthlyAvgClose ASC
    LIMIT 1
""")

result.show()

+----+-----+------------------+
|Year|Month|    LowestAvgClose|
+----+-----+------------------+
|2022|   10|131.42102960680432|
+----+-----+------------------+

