In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import os

# Load MinIO credentials
try:
    creds_file = open(f"/home/{os.getenv('USER')}/creds.txt", "r").read().strip().split(",")
    accesskey, secretkey = creds_file[0], creds_file[1]
    print("MinIO credentials loaded successfully.")
except FileNotFoundError:
    print("MinIO credentials file not found. Exiting.")
    exit(1)

# Load MySQL credentials
try:
    db_creds_file = open(f"/home/{os.getenv('USER')}/database-creds.txt", "r").read().strip().split(",")
    dbusername, dbpassword = db_creds_file[0], db_creds_file[1]
    print("Database credentials loaded successfully.")
except FileNotFoundError:
    print("Database credentials file not found. Exiting.")
    exit(1)

# Spark configuration
conf = SparkConf() \
    .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.3,org.apache.hadoop:hadoop-common:3.2.3,mysql:mysql-connector-java:8.0.33') \
    .set('spark.hadoop.fs.s3a.access.key', accesskey) \
    .set('spark.hadoop.fs.s3a.secret.key', secretkey) \
    .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .set('spark.hadoop.fs.s3a.path.style.access', 'true') \
    .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .set("spark.hadoop.fs.s3a.endpoint", "http://system54.rice.iit.edu") \
    .setMaster("spark://sm.service.consul:7077") \
    .set("spark.driver.memory", "8g") \
    .set("spark.executor.memory", "4g") \
    .set("spark.cores.max", "10") \
    .set("spark.executor.cores", "1")

# Initialize Spark session
spark = SparkSession.builder \
    .appName("gchoi6-module-11-part1") \
    .config(conf=conf) \
    .getOrCreate()

# Read and process the cleaned parquet file
try:
    df = spark.read.parquet("s3a://itmd521/50-cleaned.parquet")
    print("Parquet file loaded successfully.")
except Exception as e:
    print(f"Failed to read parquet file from MinIO: {e}")
    spark.stop()
    exit(1)

# Save DataFrame to MySQL
try:
    mysql_url = f"jdbc:mysql://system75.rice.iit.edu:3306/gchoi6"
    mysql_table = "d50"

    df.write \
        .format("jdbc") \
        .option("url", mysql_url) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", mysql_table) \
        .option("user", dbusername) \
        .option("password", dbpassword) \
        .mode("overwrite") \
        .save()
    print("Data successfully written to MySQL.")
except Exception as e:
    print(f"Failed to write data to MySQL: {e}")

# Stop Spark session
spark.stop()


In [None]:
# Test if data is loading properly from MinIO

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import os

# Load MinIO credentials
try:
    creds_file = open(f"/home/{os.getenv('USER')}/creds.txt", "r").read().strip().split(",")
    accesskey, secretkey = creds_file[0], creds_file[1]
    print("MinIO credentials loaded successfully.")
except FileNotFoundError:
    print("MinIO credentials file not found. Exiting.")
    exit(1)

# Stop any existing SparkContext or SparkSession
if SparkContext._active_spark_context is not None:
    print("Stopping existing SparkContext...")
    SparkContext._active_spark_context.stop()
    print("Existing SparkContext stopped successfully.")

# Spark Configuration
conf = SparkConf() \
    .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.3,org.apache.hadoop:hadoop-common:3.2.3') \
    .set('spark.hadoop.fs.s3a.access.key', accesskey) \
    .set('spark.hadoop.fs.s3a.secret.key', secretkey) \
    .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .set('spark.hadoop.fs.s3a.path.style.access', 'true') \
    .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .set("spark.hadoop.fs.s3a.endpoint", "http://system54.rice.iit.edu") \
    .setMaster("local[*]") \
    .setAppName("Data Count Example")

# Initialize Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Load and count data from s3a://itmd521/50-cleaned.parquet
try:
    df_50 = spark.read.parquet("s3a://itmd521/50-cleaned.parquet")
    print("Data from 50-cleaned.parquet loaded successfully!")
    count_50 = df_50.count()  # Count total records
    print(f"Total records in df_50: {count_50}")
except Exception as e:
    print(f"Error loading data from 50-cleaned.parquet: {e}")

# Load and count data from s3a://itmd521/60-cleaned.parquet
try:
    df_60 = spark.read.parquet("s3a://itmd521/60-cleaned.parquet")
    print("Data from 60-cleaned.parquet loaded successfully!")
    count_60 = df_60.count()  # Count total records
    print(f"Total records in df_60: {count_60}")
except Exception as e:
    print(f"Error loading data from 60-cleaned.parquet: {e}")

# Stop Spark session
spark.stop()


In [None]:
# Convert to JSON format since 60.json does not exist and is required for the lab
# Save the JSON format under my username which is gchoi6

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import os

# Load MinIO credentials
try:
    creds_file = open(f"/home/{os.getenv('USER')}/creds.txt", "r").read().strip().split(",")
    accesskey, secretkey = creds_file[0], creds_file[1]
    print("MinIO credentials loaded successfully.")
except FileNotFoundError:
    print("MinIO credentials file not found. Exiting.")
    exit(1)

# Stop any existing SparkContext or SparkSession
if SparkContext._active_spark_context is not None:
    print("Stopping existing SparkContext...")
    SparkContext._active_spark_context.stop()
    print("Existing SparkContext stopped successfully.")

# Spark Configuration
conf = SparkConf() \
    .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.3,org.apache.hadoop:hadoop-common:3.2.3') \
    .set('spark.hadoop.fs.s3a.access.key', accesskey) \
    .set('spark.hadoop.fs.s3a.secret.key', secretkey) \
    .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .set('spark.hadoop.fs.s3a.path.style.access', 'true') \
    .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .set("spark.hadoop.fs.s3a.endpoint", "http://system54.rice.iit.edu") \
    .setMaster("local[*]") \
    .setAppName("Parquet to JSON and Merge Example")

# Initialize Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Step 1: Read Parquet file and write to JSON
try:
    df_60 = spark.read.parquet("s3a://itmd521/60-cleaned.parquet")
    print("Data from 60-cleaned.parquet loaded successfully!")
    output_path = "s3a://gchoi6/output/60.json"
    df_60.write.json(output_path, mode="overwrite")
    print(f"60-cleaned.parquet successfully written to {output_path}")
except Exception as e:
    print(f"Error processing Parquet to JSON conversion: {e}")

# Stop Spark session
spark.stop()

In [None]:
# Now we have two datasets: 60.json compressed from Minio and the 50 JDBC table.
# Load both 50 JTBC table and 60.json file as dataframe (df_50 and df_60)

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, stddev, count, expr, lit
import os

# Load MinIO credentials
try:
    creds_file = open(f"/home/{os.getenv('USER')}/creds.txt", "r").read().strip().split(",")
    accesskey, secretkey = creds_file[0], creds_file[1]
    print("MinIO credentials loaded successfully.")
except FileNotFoundError:
    print("MinIO credentials file not found. Exiting.")
    exit(1)

# Load MySQL credentials
try:
    db_creds_file = open(f"/home/{os.getenv('USER')}/database-creds.txt", "r").read().strip().split(",")
    dbusername, dbpassword = db_creds_file[0], db_creds_file[1]
    print("Database credentials loaded successfully.")
except FileNotFoundError:
    print("Database credentials file not found. Exiting.")
    exit(1)

# Stop any existing SparkContext or SparkSession
if SparkContext._active_spark_context is not None:
    print("Stopping existing SparkContext...")
    SparkContext._active_spark_context.stop()
    print("Existing SparkContext stopped successfully.")

# Spark Configuration
conf = SparkConf() \
    .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.3,org.apache.hadoop:hadoop-common:3.2.3,mysql:mysql-connector-java:8.0.33') \
    .set('spark.hadoop.fs.s3a.access.key', accesskey) \
    .set('spark.hadoop.fs.s3a.secret.key', secretkey) \
    .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .set('spark.hadoop.fs.s3a.path.style.access', 'true') \
    .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .set("spark.hadoop.fs.s3a.endpoint", "http://system54.rice.iit.edu") \
    .setMaster("local[*]") \
    .setAppName("Lab11 Part 2")

# Initialize Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

try:
    df_60 = spark.read.json("s3a://gchoi6/output/60.json")
    print("60.json loaded successfully.")
except Exception as e:
    print(f"Error loading 60.json: {e}")

try:
    jdbc_url = f"jdbc:mysql://system75.rice.iit.edu:3306/gchoi6"
    df_50 = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", "d50") \
        .option("user", dbusername) \
        .option("password", dbpassword) \
        .load()
    print("d50 table loaded successfully.")
except Exception as e:
    print(f"Error loading d50 table: {e}")

# Combine the dataframesa
combined_df = df_50.union(df_60)

In [None]:
# Uncomment and try this if above df_50 from MySQL database is not loading preoperly
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, year, avg, stddev, expr
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import os

# Load MinIO credentials
try:
    creds_file = open(f"/home/{os.getenv('USER')}/creds.txt", "r").read().strip().split(",")
    accesskey, secretkey = creds_file[0], creds_file[1]
    print("MinIO credentials loaded successfully.")
except FileNotFoundError:
    print("MinIO credentials file not found. Exiting.")
    exit(1)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Weather Data Analytics") \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.3,org.apache.hadoop:hadoop-common:3.2.3') \
    .config('spark.hadoop.fs.s3a.access.key', accesskey) \
    .config('spark.hadoop.fs.s3a.secret.key', secretkey) \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config('spark.hadoop.fs.s3a.path.style.access', 'true') \
    .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config("spark.hadoop.fs.s3a.endpoint", "http://system54.rice.iit.edu") \
    .getOrCreate()

# Load df_50 and df_60
try:
    df_50 = spark.read.parquet("s3a://itmd521/50-cleaned.parquet")
    df_60 = spark.read.parquet("s3a://itmd521/60-cleaned.parquet")
    print("Data from both parquet files loaded successfully!")
except Exception as e:
    print(f"Error loading data from parquet files: {e}")
    spark.stop()
    exit(1)

# Combine the dataframes
combined_df = df_50.union(df_60)
"""

# Filter for February data and remove unrealistic values
february_df = combined_df.filter(
    (month(combined_df.ObservationDate) == 2) &
    (combined_df.AirTemperature > -100) &
    (combined_df.AirTemperature < 100) &
    (combined_df.AtmosphericPressure > 800) &
    (combined_df.AtmosphericPressure < 1100)
)

# Create a temporary view for SQL queries
february_df.createOrReplaceTempView("february_data")

In [None]:
# 1. Count the number of records total
total_count = february_df.count()
count_df = spark.createDataFrame([(total_count,)], ["TotalCount"])
count_df.coalesce(1).write.csv("february_count.csv", header=True, mode="overwrite")

# 2. Average air temperature for month of February for each year
avg_temp = spark.sql("""
    SELECT YEAR(ObservationDate) as Year, AVG(AirTemperature) as AvgTemperature
    FROM february_data
    GROUP BY YEAR(ObservationDate)
    ORDER BY Year
""")
avg_temp.coalesce(1).write.csv("february_avg_temp.csv", header=True, mode="overwrite")

# 3. Median air temperature for month of February for each year
median_temp = spark.sql("""
    SELECT YEAR(ObservationDate) as Year, 
           percentile_approx(AirTemperature, 0.5) as MedianTemperature
    FROM february_data
    GROUP BY YEAR(ObservationDate)
    ORDER BY Year
""")
median_temp.coalesce(1).write.csv("february_median_temp.csv", header=True, mode="overwrite")

# 4. Standard Deviation of air temperature for month of February for each year
stddev_temp = spark.sql("""
    SELECT YEAR(ObservationDate) as Year, STDDEV(AirTemperature) as StdDevTemperature
    FROM february_data
    GROUP BY YEAR(ObservationDate)
    ORDER BY Year
""")
stddev_temp.coalesce(1).write.csv("february_stddev_temp.csv", header=True, mode="overwrite")

# 5. AVG air temperature per StationID in the month of February for each year
avg_temp_station = spark.sql("""
    SELECT YEAR(ObservationDate) as Year, WeatherStation, AVG(AirTemperature) as AvgTemperature
    FROM february_data
    GROUP BY YEAR(ObservationDate), WeatherStation
    ORDER BY Year, WeatherStation
""")
avg_temp_station.coalesce(1).write.csv("february_avg_temp_by_station.csv", header=True, mode="overwrite")

# Display results
print(f"Total count of February records: {total_count}")
print("\nAverage air temperature for February by year:")
avg_temp.show()
print("\nMedian air temperature for February by year:")
median_temp.show()
print("\nStandard deviation of air temperature for February by year:")
stddev_temp.show()
print("\nAverage air temperature per station for February by year (first 20 rows):")
avg_temp_station.show(20)

# Uncomment this if you want to produce csv files in MinIO instead of local directory.
# I also ran below code, so you can see my final_lab csv files under gchoi6 folder in MinIO
"""
# 1. Count the number of records total
total_count = february_df.count()
count_df = spark.createDataFrame([(total_count,)], ["TotalCount"])
count_df.coalesce(1).write.csv("s3a://gchoi6/final_lab_output/february_count.csv", header=True, mode="overwrite")

# 2. Average air temperature for month of February for each year
avg_temp = spark.sql("""
    SELECT YEAR(ObservationDate) as Year, AVG(AirTemperature) as AvgTemperature
    FROM february_data
    GROUP BY YEAR(ObservationDate)
    ORDER BY Year
""")
avg_temp.coalesce(1).write.csv("s3a://gchoi6/final_lab_output/february_avg_temp.csv", header=True, mode="overwrite")

# 3. Median air temperature for month of February for each year
median_temp = spark.sql("""
    SELECT YEAR(ObservationDate) as Year, 
           percentile_approx(AirTemperature, 0.5) as MedianTemperature
    FROM february_data
    GROUP BY YEAR(ObservationDate)
    ORDER BY Year
""")
median_temp.coalesce(1).write.csv("s3a://gchoi6/final_lab_output/february_median_temp.csv", header=True, mode="overwrite")

# 4. Standard Deviation of air temperature for month of February for each year
stddev_temp = spark.sql("""
    SELECT YEAR(ObservationDate) as Year, STDDEV(AirTemperature) as StdDevTemperature
    FROM february_data
    GROUP BY YEAR(ObservationDate)
    ORDER BY Year
""")
stddev_temp.coalesce(1).write.csv("s3a://gchoi6/final_lab_output/february_stddev_temp.csv", header=True, mode="overwrite")

# 5. AVG air temperature per StationID in the month of February for each year
avg_temp_station = spark.sql("""
    SELECT YEAR(ObservationDate) as Year, WeatherStation, AVG(AirTemperature) as AvgTemperature
    FROM february_data
    GROUP BY YEAR(ObservationDate), WeatherStation
    ORDER BY Year, WeatherStation
""")
avg_temp_station.coalesce(1).write.csv("s3a://gchoi6/final_lab_output/february_avg_temp_by_station.csv", header=True, mode="overwrite")
"""