# Spark Analysis

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, avg, count, desc, when
import warnings
warnings.filterwarnings('ignore')

## spark setup

In [2]:
spark = SparkSession.builder \
    .appName("StockPortfolioAnalysis") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

print(f"✅ Spark Session created successfully")
print(f"Spark Version: {spark.version}")
print(f"Master: {spark.sparkContext.master}")

✅ Spark Session created successfully
Spark Version: 3.5.3
Master: spark://spark-master:7077


-----------------
## Data extraction

In [7]:
df = spark.read.csv(
    "FULL_STOCKS.csv",  # Adjust path as needed
    header=True,
    inferSchema=True
)

print(f"✅ Data loaded successfully")
print(f"Total rows: {df.count()}")
print(f"Total columns: {len(df.columns)}")

Py4JJavaError: An error occurred while calling o25.read.
: java.lang.IllegalStateException: LiveListenerBus is stopped.
	at org.apache.spark.scheduler.LiveListenerBus.addToQueue(LiveListenerBus.scala:92)
	at org.apache.spark.scheduler.LiveListenerBus.addToStatusQueue(LiveListenerBus.scala:75)
	at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:115)
	at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:143)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:143)
	at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:142)
	at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:162)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:160)
	at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:157)
	at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:699)
	at org.apache.spark.sql.SparkSession.read(SparkSession.scala:783)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [None]:
print("="*80)
print("FIRST 10 RECORDS")
print("="*80)
df.show(10, truncate=False)

print("\nDataFrame Schema:")
df.printSchema()

-----------
## Questions

### Q1. Total Trading Volume by Stock Ticker

In [None]:
print("="*80)
print("Q1: Total Trading Volume for Each Stock Ticker")
print("="*80)

q1_result = df.groupBy("stock_ticker") \
    .agg(spark_sum("quantity").alias("total_volume")) \
    .orderBy(desc("total_volume"))

q1_result.show(20, truncate=False)

# Save results
q1_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/q1_volume_by_ticker")

print("✅ Q1 completed and saved")

### Q2. verage Stock Price by Sector

In [None]:
print("="*80)
print("Q2: Average Stock Price by Sector")
print("="*80)

q2_result = df.groupBy("stock_sector") \
    .agg(avg("stock_price").alias("average_price")) \
    .orderBy(desc("average_price"))

q2_result.show(truncate=False)

# Save results
q2_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/q2_avg_price_by_sector")

print("✅ Q2 completed and saved")

### Q3. Weekend Transactions (Buy vs Sell)

In [None]:
print("="*80)
print("Q3: Buy vs Sell Transactions on Weekends")
print("="*80)

# Filter for weekend transactions
weekend_df = df.filter(col("is_weekend") == 1)

q3_result = weekend_df.groupBy("transaction_type") \
    .agg(count("transaction_id").alias("transaction_count")) \
    .orderBy(desc("transaction_count"))

q3_result.show(truncate=False)

total_weekend = weekend_df.count()
print(f"\nTotal weekend transactions: {total_weekend}")

# Save results
q3_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/q3_weekend_transactions")

print("✅ Q3 completed and saved")

### Q4. Customers with More Than 10 Transactions

In [None]:
print("="*80)
print("Q4: Customers with More Than 10 Transactions")
print("="*80)

q4_result = df.groupBy("customer_id") \
    .agg(count("transaction_id").alias("transaction_count")) \
    .filter(col("transaction_count") > 10) \
    .orderBy(desc("transaction_count"))

print(f"Total customers with >10 transactions: {q4_result.count()}")
print("\nTop 20 customers:")
q4_result.show(20, truncate=False)

# Save results
q4_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/q4_active_customers")

print("✅ Q4 completed and saved")

### Q5. Total Trade Amount by Day of Week

In [None]:
print("="*80)
print("Q5: Total Trade Amount per Day of Week (Highest to Lowest)")
print("="*80)

q5_result = df.groupBy("day_name") \
    .agg(spark_sum("total_trade_amount").alias("total_trade_amount")) \
    .orderBy(desc("total_trade_amount"))

q5_result.show(truncate=False)

# Save results
q5_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/q5_trade_by_day")

print("✅ Q5 completed and saved")

---------------------
## Register DataFrame for SQL Queries

In [None]:
df.createOrReplaceTempView("stocks")
print("✅ DataFrame registered as 'stocks' view for SQL queries")

### Q1. Top 5 Most Traded Stocks by Quantity

In [None]:
print("="*80)
print("SQL Q1: Top 5 Most Traded Stock Tickers by Total Quantity")
print("="*80)

sql_query_1 = """
    SELECT 
        stock_ticker,
        SUM(quantity) as total_quantity
    FROM stocks
    GROUP BY stock_ticker
    ORDER BY total_quantity DESC
    LIMIT 5
"""

sql_q1_result = spark.sql(sql_query_1)
sql_q1_result.show(truncate=False)

# Save results
sql_q1_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/sql_q1_top_5_stocks")

print("✅ SQL Q1 completed and saved")

### Q2. Average Trade Amount by Account Type

In [None]:
print("="*80)
print("SQL Q2: Average Trade Amount by Customer Account Type")
print("="*80)

sql_query_2 = """
    SELECT 
        customer_account_type,
        AVG(total_trade_amount) as avg_trade_amount,
        COUNT(*) as transaction_count
    FROM stocks
    GROUP BY customer_account_type
    ORDER BY avg_trade_amount DESC
"""

sql_q2_result = spark.sql(sql_query_2)
sql_q2_result.show(truncate=False)

# Save results
sql_q2_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/sql_q2_avg_by_account")

print("✅ SQL Q2 completed and saved")

### Q3. Holiday vs Non-Holiday Transactions

In [None]:
print("="*80)
print("SQL Q3: Transactions During Holidays vs Non-Holidays")
print("="*80)

sql_query_3 = """
    SELECT 
        CASE 
            WHEN is_holiday = 1 THEN 'Holiday'
            ELSE 'Non-Holiday'
        END as period_type,
        COUNT(*) as transaction_count,
        SUM(total_trade_amount) as total_trade_amount
    FROM stocks
    GROUP BY is_holiday
    ORDER BY is_holiday DESC
"""

sql_q3_result = spark.sql(sql_query_3)
sql_q3_result.show(truncate=False)

# Save results
sql_q3_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/sql_q3_holiday_comparison")

print("✅ SQL Q3 completed and saved")

### Q4. Sectors with Highest Weekend Volume

In [None]:
print("="*80)
print("SQL Q4: Stock Sectors with Highest Weekend Trading Volume")
print("="*80)

sql_query_4 = """
    SELECT 
        stock_sector,
        SUM(quantity) as total_weekend_volume,
        COUNT(*) as weekend_transactions,
        SUM(total_trade_amount) as total_weekend_value
    FROM stocks
    WHERE is_weekend = 1
    GROUP BY stock_sector
    ORDER BY total_weekend_volume DESC
"""

sql_q4_result = spark.sql(sql_query_4)
sql_q4_result.show(truncate=False)

# Save results
sql_q4_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/sql_q4_weekend_sectors")

print("✅ SQL Q4 completed and saved")

### Q5. Buy vs Sell by Liquidity Tier

In [None]:
print("="*80)
print("SQL Q5: Total Buy vs Sell Amount by Stock Liquidity Tier")
print("="*80)

sql_query_5 = """
    SELECT 
        stock_liquidity_tier,
        SUM(CASE WHEN transaction_type = 'BUY' THEN total_trade_amount ELSE 0 END) as total_buy_amount,
        SUM(CASE WHEN transaction_type = 'SELL' THEN total_trade_amount ELSE 0 END) as total_sell_amount,
        SUM(total_trade_amount) as total_amount,
        COUNT(CASE WHEN transaction_type = 'BUY' THEN 1 END) as buy_count,
        COUNT(CASE WHEN transaction_type = 'SELL' THEN 1 END) as sell_count
    FROM stocks
    GROUP BY stock_liquidity_tier
    ORDER BY total_amount DESC
"""

sql_q5_result = spark.sql(sql_query_5)
sql_q5_result.show(truncate=False)

# Save results
sql_q5_result.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv("output/spark_results/sql_q5_liquidity_analysis")

print("✅ SQL Q5 completed and saved")

In [None]:
print("\n" + "="*80)
print("ANALYSIS SUMMARY")
print("="*80)
print("✅ All 5 Spark DataFrame questions completed")
print("✅ All 5 Spark SQL questions completed")
print("✅ Results saved to output/spark_results/")
print("="*80)

spark.stop()
print("\n✅ Spark session stopped")