In [17]:
import pymysql
from sshtunnel import SSHTunnelForwarder
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DecimalType

### Database credentials and server details

In [28]:
remote_host = "206.189.103.247"              # Your remote server IP
remote_port = 3306                           # MariaDB port
local_port = 3307                            # Local port to forward
ssh_username = ""                        # Replace with your SSH username
ssh_private_key = ""                    # Path to your private key file
db_config = {
    "host": "127.0.0.1",                 # Use localhost for the tunnel
    "user": "",                     # Your MariaDB username
    "password": "",                 # Your MariaDB password
    "database": "",            # The database you want to connect to
    "port": local_port
}

### Establish SSH tunnel and database connection

In [19]:
def connect_to_database():
    try:
        tunnel = SSHTunnelForwarder(
            (remote_host, 22),
            ssh_username=ssh_username,
            ssh_private_key=ssh_private_key,
            remote_bind_address=('127.0.0.1', remote_port),
            local_bind_address=('127.0.0.1', local_port)
        )
        tunnel.start()
        print(f"SSH tunnel established. Local port: {tunnel.local_bind_port}")

        # Connect to the database
        connection = pymysql.connect(**db_config)
        print("Connected to the MariaDB database!")
        return connection, tunnel

    except Exception as e:
        print(f"Error setting up SSH or connecting to MariaDB: {e}")
        return None, None

### Fetch data from the database

In [20]:
def fetch_data_from_db(connection, query):
    try:
        with connection.cursor() as cursor:
            cursor.execute(query)
            data = cursor.fetchall()
            print("Data fetched successfully from the database!")
            return data
    except Exception as e:
        print(f"Error fetching data: {e}")
        return None

### Process data using PySpark

In [26]:
def process_data_with_spark(data, schema):
    try:
        # Create a PySpark session
        spark = SparkSession.builder.master("local[*]").appName("MariaDBConnection").getOrCreate()
        df = spark.createDataFrame(data, schema=schema)
        df.show(truncate=False)  # Display DataFrame in a formatted table
        return df
    except Exception as e:
        print(f"Error processing data with PySpark: {e}")
        return None

### Main ETL execution

In [27]:
if __name__ == "__main__":
    # Define your SQL query
    query = """ Your Query here """
    schema = ["col1", "col2", ]

    connection, tunnel = connect_to_database()
    if connection and tunnel:
        try:
            data = fetch_data_from_db(connection, query)
            if data:
                process_data_with_spark(data, schema)
        finally:
            # Close the database connection and tunnel
            connection.close()
            tunnel.stop()
            print("Connection and tunnel closed.")

[[34m2024-12-28T16:28:47.822+0200[0m] {[34mtransport.py:[0m1944} INFO[0m - Connected (version 2.0, client OpenSSH_8.0)[0m
[[34m2024-12-28T16:28:48.335+0200[0m] {[34mtransport.py:[0m1944} INFO[0m - Authentication (publickey) successful![0m
SSH tunnel established. Local port: 3309
Connected to the MariaDB database!
Data fetched successfully from the database!
Connection and tunnel closed.
