In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from dotenv import load_dotenv
import os
import boto3
from datetime import datetime

load_dotenv(r"C:\Users\gowth\Desktop\Final_project\batch\.env")
sf_url = os.getenv("SFURL")
sf_user=os.getenv("SFUSER")
sf_password = os.getenv("SFPASSWORD")
sf_database = os.getenv("SFDATABASE")
sf_schema = os.getenv("SFSCHEMA")
sf_warehouse = os.getenv("SFWAREHOUSE")
sf_role = os.getenv("SFROLE")
my_pass=os.getenv("PASSWORD")

#spark creation
def spark_create():
    spark = SparkSession.builder \
    .appName("MySQL_Snowflake_Integration") \
    .config("spark.jars", r"C:\Users\gowth\Desktop\Final_project\mysql-connector-j-9.3.0\mysql-connector-j-9.3.0.jar") \
    .config("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:3.1.1,net.snowflake:snowflake-jdbc:3.13.6") \
    .getOrCreate()
    return spark
#spark-mysql connection
def spark_mysql_connection(spark):
    jdbc_url = "jdbc:mysql://localhost:3306/db_guvi?useSSL=false&serverTimezone=UTC"
    connection_properties = {
    "user": "root",
    "password": my_pass,
    "driver": "com.mysql.cj.jdbc.Driver",
    "timestampTimezone": "Asia/Kolkata"
    }
    return jdbc_url, connection_properties
#mysql final data 
def weather_data_mysql(spark,jdbc_url,connection_properties):
    last_load_df = spark.read.jdbc(
    url=jdbc_url,
    table="load_time_tracker",
    properties=connection_properties)
    last_load_df = last_load_df.withColumn("loaded_time_corrected", expr("last_loaded_time - INTERVAL 5 HOURS 30 MINUTES"))
    corrected_time = last_load_df.select("loaded_time_corrected").first()[0]

    query = f"(SELECT * FROM station_data WHERE loaded_time > '{corrected_time}') AS filtered_data"

    df = spark.read.jdbc(
        url=jdbc_url,
        table=query,
        properties=connection_properties
    )
    final_df=df.withColumn("loaded_time", expr("loaded_time - INTERVAL 5 HOURS 30 MINUTES"))
    return final_df

#csv data
def read_sensor_csv(spark):
    df=spark.read.option("header","True") \
        .option("inferSchema","True") \
        .csv(r"C:\Users\gowth\Desktop\Final_project\faker_output")
    return df
#load final data to mysql
def spark_to_mysql(df,spark,jdbc_url,tablename,connection_props):
    df.write.jdbc(
    url=jdbc_url,
    table=tablename,
    mode="append",
    properties=connection_props)


#snowflake connection

def snowflake_loading(df,spark,tablename):
    sfOptions = {
    "sfURL": sf_url,
    "sfUser": sf_user,
    "sfPassword": sf_password,
    "sfDatabase": sf_database,
    "sfSchema": sf_schema,
    "sfWarehouse": sf_warehouse,
    "sfRole": sf_role }

    df.write \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", tablename) \
    .mode("append") \
    .save()

def upload_to_s3():
    aws_access_key=os.getenv("AWS_ACCESS_KEY_ID")
    aws_secret_key =os.getenv("AWS_SECRET_ACCESS_KEY")
    bucket_name = 'backupdataguvi'
    s3_folder = 'sensor_backup/'
    local_folder = r"C:\Users\gowth\Desktop\Final_project\faker_output"
    s3_client = boto3.client(
    's3',
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_key)

    for filename in os.listdir(local_folder):
        if filename.endswith('.csv'):
            print(filename)
            local_path = os.path.join(local_folder, filename)
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            s3_key = s3_folder + f"{timestamp}_{filename}"
            try:
                s3_client.upload_file(local_path, bucket_name, s3_key)
                print(f"Uploaded: {filename} -> s3://{bucket_name}/{s3_key}")
                os.remove(local_path)
                print(f"🗑️ Deleted local file: {filename}")
            except Exception as e:
                print(f"Failed to upload {filename}: {e}")
    


In [3]:
spark=spark_create()

In [4]:
jdbc_url, connection_properties=spark_mysql_connection(spark)

In [5]:
df=weather_data_mysql(spark,jdbc_url,connection_properties)

In [6]:
df.show()

+----------+----------+--------+--------+------------+----------------+-------------------+
|station_id|      date|min_temp|max_temp|avg_humidity|precipitation_mm|        loaded_time|
+----------+----------+--------+--------+------------+----------------+-------------------+
|     S1968|2025-06-05|    24.4|    36.2|       31.93|            0.46|2025-06-22 14:08:34|
|     S9042|2025-06-20|    15.2|    28.4|       47.02|           14.95|2025-06-22 14:08:34|
|     S1135|2024-10-30|     6.5|    16.0|       44.27|            0.17|2025-06-22 14:08:34|
|     S8444|2024-11-02|    29.2|    35.0|       86.33|           13.31|2025-06-22 14:08:34|
|     S2510|2025-02-01|    24.7|    34.6|       36.19|            3.68|2025-06-22 14:08:34|
|     S7533|2025-02-12|    -2.9|     2.5|       71.05|            0.29|2025-06-22 14:08:34|
|     S7303|2024-11-15|    -9.7|     2.1|       56.83|            0.91|2025-06-22 14:08:34|
|     S7131|2025-05-02|     2.3|    14.4|       49.32|           13.87|2025-06-2

In [8]:
cr_station_df = df.withColumn("loaded_time", expr("loaded_time + INTERVAL 5 HOURS 30 MINUTES"))
cr_station_df.show()

+----------+----------+--------+--------+------------+----------------+-------------------+
|station_id|      date|min_temp|max_temp|avg_humidity|precipitation_mm|        loaded_time|
+----------+----------+--------+--------+------------+----------------+-------------------+
|     S1968|2025-06-05|    24.4|    36.2|       31.93|            0.46|2025-06-22 19:38:34|
|     S9042|2025-06-20|    15.2|    28.4|       47.02|           14.95|2025-06-22 19:38:34|
|     S1135|2024-10-30|     6.5|    16.0|       44.27|            0.17|2025-06-22 19:38:34|
|     S8444|2024-11-02|    29.2|    35.0|       86.33|           13.31|2025-06-22 19:38:34|
|     S2510|2025-02-01|    24.7|    34.6|       36.19|            3.68|2025-06-22 19:38:34|
|     S7533|2025-02-12|    -2.9|     2.5|       71.05|            0.29|2025-06-22 19:38:34|
|     S7303|2024-11-15|    -9.7|     2.1|       56.83|            0.91|2025-06-22 19:38:34|
|     S7131|2025-05-02|     2.3|    14.4|       49.32|           13.87|2025-06-2

In [9]:
snowflake_loading(cr_station_df, spark, "weather_station_data")
