In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col
import pandas as pd

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Multi-Outlet ETL") \
    .config("spark.jars", "C:\Program Files (x86)\MySQL\MySQL jdbc\mysql-connector-j-9.0.0.jar") \
    .getOrCreate()

# MySQL connection parameters
def get_jdbc_url(host, db):
    return f"jdbc:mysql://{host}/{db}"

# Function to extract data from MySQL for each outlet
def extract_data_from_mysql(table, connection_params):
    return spark.read.format("jdbc") \
        .option("url", connection_params['url']) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", table) \
        .option("user", connection_params['user']) \
        .option("password", connection_params['password']) \
        .load()

# Connection parameters for each outlet
london_conn = {
    'url': get_jdbc_url('localhost', 'london_db'),
    'user': '***',
    'password': '***'
}

paris_conn = {
    'url': get_jdbc_url('localhost', 'paris_db'),
    'user': '***',
    'password': '***'
}

newyork_conn = {
    'url': get_jdbc_url('localhost', 'newyork_db'),
    'user': '***',
    'password': '***'
}

# Extract Data from each outlet
london_customers = extract_data_from_mysql('london_customers', london_conn)
paris_clients = extract_data_from_mysql('paris_clients', paris_conn)
nyc_customers = extract_data_from_mysql('nyc_customers', newyork_conn)

london_inventory = extract_data_from_mysql('london_inventory', london_conn)
paris_inventory = extract_data_from_mysql('paris_inventaire', paris_conn)
nyc_inventory = extract_data_from_mysql('nyc_inventory', newyork_conn)

london_sales = extract_data_from_mysql('london_sales', london_conn)
paris_sales = extract_data_from_mysql('paris_ventes', paris_conn)
nyc_transactions = extract_data_from_mysql('nyc_transactions', newyork_conn)

# --- Transformations ---

# 1. Customers: Combine customer data from all outlets
customers_london = london_customers.withColumnRenamed('Customer_ID', 'CustomerID').withColumnRenamed('Customer_Name', 'Name')
customers_paris = paris_clients.withColumnRenamed('Identifiant_Client', 'CustomerID').withColumnRenamed('Nom_Client', 'Name')
customers_nyc = nyc_customers.withColumnRenamed('Customer_ID', 'CustomerID').withColumnRenamed('Name', 'Name')

# Union all customer data
customers = customers_london.union(customers_paris).union(customers_nyc)

# 2. Local Inventory: Combine inventory from all outlets with location information
inventory_london = london_inventory.withColumn('Location', lit('London')).withColumnRenamed('Product_ID', 'ProductID').withColumnRenamed('Product_Name', 'ProductName').withColumnRenamed('Price', 'UnitPrice')
inventory_paris = paris_inventory.withColumn('Location', lit('Paris')).withColumnRenamed('Identifiant_Produit', 'ProductID').withColumnRenamed('Nom_Produit', 'ProductName').withColumnRenamed('Prix_Unitaire', 'UnitPrice')
inventory_nyc = nyc_inventory.withColumn('Location', lit('New York')).withColumnRenamed('Product_ID', 'ProductID').withColumnRenamed('Product_Name', 'ProductName').withColumnRenamed('Unit_Price', 'UnitPrice')

# Union all inventory data
local_inventory = inventory_london.union(inventory_paris).union(inventory_nyc)

# 3. Transactions: Combine sales/transaction data from all outlets with location information
transactions_london = london_sales.withColumn('Location', lit('London')).withColumnRenamed('Order_ID', 'TransactionID').withColumnRenamed('Customer_ID', 'CustomerID').withColumnRenamed('Product_ID', 'ProductID').withColumnRenamed('Quantity', 'Quantity')
transactions_paris = paris_sales.withColumn('Location', lit('Paris')).withColumnRenamed('Numero_Vente', 'TransactionID').withColumnRenamed('Identifiant_Client', 'CustomerID').withColumnRenamed('Identifiant_Produit', 'ProductID').withColumnRenamed('Quantité', 'Quantity')
transactions_nyc = nyc_transactions.withColumn('Location', lit('New York')).withColumnRenamed('Transaction_ID', 'TransactionID').withColumnRenamed('Customer_ID', 'CustomerID').withColumnRenamed('Product_ID', 'ProductID').withColumnRenamed('Quantity', 'Quantity')

# Union all transaction data
transactions = transactions_london.union(transactions_paris).union(transactions_nyc)

# --- Load into Destination ---
customers.write.mode("overwrite").parquet("s3a://spark-job-destination/customers/")
local_inventory.write.mode("overwrite").parquet("s3a://spark-job-destination/local_inventory/")
transactions.write.mode("overwrite").parquet("s3a://spark-job-destination/transactions/")

# Stop the Spark Session
spark.stop()


Py4JJavaError: An error occurred while calling o58.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
