In [None]:
import time

import dotenv
from pyspark.sql.functions import *
from pyspark.sql.types import *
from src.utils.session import create_spark_session

dotenv.load_dotenv()

In [None]:
import os
print(os.getenv("SPARK_MASTER_URL"))
print(os.getenv("SPARK_CONNECT_SERVER"))

In [None]:
spark = create_spark_session()
spark.conf.set("spark.sql.defaultCatalog", "spark_catalog") # necessary to read from s3
print(f"Connected to: {spark.conf.get('spark.remote')}")

print(f"Spark version: {spark.version}")
print(f"Spark session type: {type(spark)}")
print(f"Connected to: {spark.conf.get('spark.remote')}")

In [None]:
# Cell 3: Define data paths and check availability
data_paths = {
    'site':  's3a://raw/site/*.parquet',
    'system': 's3a://raw/system/*.parquet',
    'pvdata': 's3a://raw/pvdata/**/*.parquet'
}

files_d = {
    'site': None,
    'system': None,
    'pvdata': None
}

print("Checking data paths:")

In [None]:
# import pandas as pd
# import s3fs

# df = pd.read_parquet('s3://raw/site/part-00000-a1617b0d-de41-4ca8-8586-51326d82d03f-c000.snappy.parquet', 
#                      storage_options={'key': 'admin', 'secret': 'password', 'client_kwargs': {'endpoint_url': 'http://localhost:9000'}})
# print(df.head())

In [None]:
from pyspark.sql import SparkSession

spark_new = SparkSession.builder \
    .appName("TestS3A") \
    .remote("sc://localhost:15002") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .config("spark.sql.defaultCatalog", "spark_catalog") \
    .getOrCreate()


print("Default Catalog:", spark_new.conf.get("spark.sql.defaultCatalog"))
print("Catalog Implementation:", spark_new.conf.get("spark.sql.catalogImplementation"))

# Try reading with the new session
df = spark_new.read.parquet("s3a://raw/site/")
df.show(5, truncate=False)
spark_new.stop()

In [None]:
# Cell 4: Read site information
print("Reading site information...")
start_time = time.time()

sites_df = spark.read.format('parquet').load(data_paths['site'])

read_time = time.time() - start_time
print(f"✅ Sites data read in {read_time:.2f} seconds")

print("\nSites Schema:")
sites_df.printSchema()

print("\nSites Sample Data:")
sites_df.show(5, truncate=False)

print(f"\nTotal sites: {sites_df.count()}")

In [None]:
# Cell 5: Read system information
print("Reading system information...")
start_time = time.time()

systems_df = spark.read.parquet(data_paths['system'])

read_time = time.time() - start_time
print(f"✅ Systems data read in {read_time:.2f} seconds")

print("\nSystems Schema:")
systems_df.printSchema()

print("\nSystems Sample Data:")
systems_df.show(5, truncate=False)

print(f"\nTotal systems: {systems_df.count()}")

In [None]:
# Cell 6: Explore PV data structure
print("Exploring PV data structure...")

try:
    pv_sample = spark.read.parquet(f"s3a://raw/pvdata/system_id=3/")
    print(f"✅ Found system-id=2 with {pv_sample.count()} records")
    
    print("\nPV Data Schema:")
    pv_sample.printSchema()
    
    print("\nPV Data Sample:")
    pv_sample.show(5, truncate=False)
    
except Exception as e:
    print(f"❌ Error reading PV data: {str(e)}")
    print("Trying alternative approach...")
    
    try:
        all_pv = spark.read.parquet(data_paths['pvdata'])
        print(f"✅ Found total PV data with {all_pv.count()} records")
        
        print("\nPV Data Schema:")
        all_pv.printSchema()
        
        print("\nPV Data Sample:")
        all_pv.show(5, truncate=False)
        
    except Exception as e2:
        print(f"❌ Error reading all PV data: {str(e2)}")

In [None]:
# Cell 7: Performance test - Read large PV data
print("Performance Test: Reading 14GB PV Data")
print("=" * 50)

start_time = time.time()

pv_df = spark.read.parquet("s3a://raw/pvdata/")
total_records = pv_df.count()

read_time = time.time() - start_time
print(f"✅ PV data read in {read_time:.2f} seconds")

print(f"✅ Total PV records: {total_records:,}") #270 sec
print(f"✅ Average read speed: {total_records/read_time:,.0f} records/second")


In [None]:
# Set Iceberg as the default catalog
spark.conf.set("spark.sql.defaultCatalog", "iceberg")

# Now you can create Iceberg tables directly
spark.sql("""
CREATE TABLE IF NOT EXISTS pv_data (
    site_id STRING,
    year INT,
    month INT,
    day INT,
    power DOUBLE
) USING iceberg
PARTITIONED BY (year, month, day)
""")

# Insert data
spark.sql("""
INSERT INTO pv_data
SELECT * FROM pv_temp
""")

In [None]:
# Register DataFrame as temp view
pv_df.createOrReplaceTempView("pv_temp")

# Create Iceberg table and insert data
spark.sql("""
CREATE TABLE IF NOT EXISTS pv_data (
    site_id STRING,
    year INT,
    month INT,
    day INT,
    power DOUBLE,
    -- Add other columns as needed
    timestamp TIMESTAMP
) USING iceberg
PARTITIONED BY (year, month, day)
""")

# Insert data
spark.sql("""
INSERT INTO pv_data
SELECT * FROM pv_temp
""")

In [None]:
# Cell 8: Data quality check
print("Data Quality Check")
print("=" * 30)

print("\nNull value counts:")
for col in pv_df.columns:
    null_count = pv_df.filter(col(col).isNull()).count()
    if null_count > 0:
        print(f"  {col}: {null_count:,} nulls")

print("\nDuplicate check:")
total_rows = pv_df.count()
distinct_rows = pv_df.distinct().count()
duplicates = total_rows - distinct_rows
print(f"  Total rows: {total_rows:,}")
print(f"  Distinct rows: {distinct_rows:,}")
print(f"  Duplicates: {duplicates:,}")

print("\nData types:")
for field in pv_df.schema.fields:
    print(f"  {field.name}: {field.dataType}")

In [None]:
# Cell 9: Create temporary views for SQL
sites_df.createOrReplaceTempView("sites")
systems_df.createOrReplaceTempView("system")
pv_df.createOrReplaceTempView("pvdata")

print("Created temporary views for SQL operations")
print(f"  - sites: {sites_df.count()} records")
print(f"  - systems: {systems_df.count()} records")
print(f"  - pvdata: {pv_df.count()} records")

In [None]:
# Cell 10: Join data using Spark SQL
print("Joining site, system, and PV data...")
start_time = time.time()

joined_query = """
SELECT 
    s.site_id,
    s.site_name,
    sys.system_id,
    sys.system_name,
    pv.*
FROM pvdata pv
JOIN systems sys ON pv.system_id = sys.system_id
JOIN sites s ON sys.site_id = s.site_id
"""

joined_df = spark.sql(joined_query)

join_time = time.time() - start_time
print(f"✅ Join completed in {join_time:.2f} seconds")

joined_df.cache()

count_start = time.time()
joined_count = joined_df.count()
count_time = time.time() - count_start

print(f"✅ Joined data records: {joined_count:,}")
print(f"✅ Count operation took: {count_time:.2f} seconds")

print("\nJoined Data Sample:")
joined_df.show(5, truncate=False)

In [None]:
# Cell 11: Create view for joined data and perform aggregations
joined_df.createOrReplaceTempView("joined_data")

print("Performing aggregations...")
print("=" * 40)

# Cell 12: Total energy production by site
print("1. Total energy production by site:")
start_time = time.time()

site_totals = spark.sql("""
SELECT 
    site_id,
    site_name,
    SUM(energy_production) as total_energy,
    AVG(energy_production) as avg_energy,
    COUNT(*) as record_count
FROM joined_data
GROUP BY site_id, site_name
ORDER BY total_energy DESC
""")

query_time = time.time() - start_time
print(f"✅ Query completed in {query_time:.2f} seconds")

site_totals.show(10, truncate=False)


In [None]:
# Cell 13: System performance analysis
print("2. System performance analysis:")
start_time = time.time()

system_analysis = spark.sql("""
SELECT 
    system_id,
    system_name,
    site_name,
    SUM(energy_production) as total_energy,
    AVG(energy_production) as avg_energy,
    MAX(energy_production) as max_energy,
    MIN(energy_production) as min_energy,
    COUNT(*) as record_count
FROM joined_data
GROUP BY system_id, system_name, site_name
ORDER BY total_energy DESC
""")

query_time = time.time() - start_time
print(f"✅ Query completed in {query_time:.2f} seconds")

system_analysis.show(10, truncate=False)


In [None]:
# Cell 14: Time-based analysis
print("3. Time-based energy production analysis:")
start_time = time.time()

time_analysis = spark.sql("""
SELECT 
    DATE(timestamp) as date,
    HOUR(timestamp) as hour,
    SUM(energy_production) as daily_energy,
    AVG(energy_production) as avg_hourly_energy,
    COUNT(*) as record_count
FROM joined_data
GROUP BY DATE(timestamp), HOUR(timestamp)
ORDER BY date DESC, hour
LIMIT 20
""")

query_time = time.time() - start_time
print(f"✅ Query completed in {query_time:.2f} seconds")

time_analysis.show(20, truncate=False)

# Cell 15: Performance monitoring and summary
print("Performance Summary")
print("=" * 30)

app_id = spark.sparkContext.applicationId
print(f"Application ID: {app_id}")

try:
    executors = spark.sparkContext.statusTracker().getExecutorMetrics()
    print(f"Active executors: {len(executors) if executors else 'Unknown'}")
except:
    print("Executor info not available")

print(f"\nMemory configuration:")
print(f"  Driver memory: {spark.conf.get('spark.driver.memory', 'Not set')}")
print(f"  Executor memory: {spark.conf.get('spark.executor.memory', 'Not set')}")

print(f"\nData sizes:")
print(f"  Sites: {sites_df.count():,} records")
print(f"  Systems: {systems_df.count():,} records")
print(f"  PV Data: {pv_df.count():,} records")
print(f"  Joined: {joined_df.count():,} records")

# Cell 16: Cleanup
print("Cleaning up cached data...")
joined_df.unpersist()

print("✅ Analysis complete!")
print("\nKey findings:")
print("1. Data read performance")
print("2. Join operation efficiency")
print("3. Aggregation query performance")
print("4. Memory usage patterns")
print("5. Overall workflow efficiency")
