# Spark ETL with SQL Database MySQL

1. Install required spark libraries
2. Create connection with MySQL Database
3. Read data from MySQL Database
4. Transform data
5. Write data into MySQL Server


### 1- Spark Librairies

Start Spark Session and Load all the required library

In [1]:
from pyspark.sql import SparkSession

In [56]:
spark = SparkSession.builder \
           .appName('JDBC Cnx') \
           .config("spark.jars", "mysql-connector-java-8.0.13.jar")\
           .getOrCreate()

In [57]:
spark

### 2- Create Connection 

In [4]:
#Load CSV file into DataFrame
mysql_df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("url", "jdbc:mysql://127.0.0.1:3306/spark_db") \
    .option("dbtable", "orders") \
    .option("user", "analystdata") \
    .option("password", "pwd#90") \
    .load()

In [5]:
mysql_df.printSchema()

root
 |-- Row_ID: integer (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Discount: decimal(10,2) (nullable = true)
 |-- Unit_Price: decimal(10,2) (nullable = true)
 |-- Shipping_Cost: decimal(10,2) (nullable = true)
 |-- Customer_ID: integer (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Ship_Mode: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Sub_Category: string (nullable = true)
 |-- Product_Container: string (nullable = true)
 |-- Product_Name: string (nullable = true)
 |-- Product_Base_Margin: decimal(10,2) (nullable = true)
 |-- Region: string (nullable = true)
 |-- State_or_Province: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Postal_Code: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Profit: decimal(10,2) (nullable = true)
 |-- Quantity_ordered_new: integer (nul

In [6]:
mysql_df.show(3)

+------+--------------+--------+----------+-------------+-----------+---------------+-----------+----------------+----------------+--------------------+-----------------+--------------------+-------------------+-------+-----------------+---------+-----------+----------+---------+------+--------------------+-----+--------+
|Row_ID|Order_Priority|Discount|Unit_Price|Shipping_Cost|Customer_ID|  Customer_Name|  Ship_Mode|Customer_Segment|Product_Category|Product_Sub_Category|Product_Container|        Product_Name|Product_Base_Margin| Region|State_or_Province|     City|Postal_Code|Order_Date|Ship_Date|Profit|Quantity_ordered_new|Sales|Order_ID|
+------+--------------+--------+----------+-------------+-----------+---------------+-----------+----------------+----------------+--------------------+-----------------+--------------------+-------------------+-------+-----------------+---------+-----------+----------+---------+------+--------------------+-----+--------+
| 18606| Not Specified|    0

### 2- Write DataFrame 

In [59]:
print( "spark version=" ,SparkSession.builder.appName("test").getOrCreate().version)

spark version= 3.4.0


In [60]:
pip show pyspark

Name: pyspark
Version: 3.4.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: C:\Users\MonPC\Desktop\01-ImenBH\Projects\PySpark\pyenv\Lib\site-packages
Requires: py4j
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [62]:
# create an RDD by passing Python list object to sparkContext.parallelize() function
columns = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]
rdd = spark.sparkContext.parallelize(columns)
rdd

ParallelCollectionRDD[9] at readRDDFromFile at PythonRDD.scala:287

In [66]:
#Converting PySpark RDD to DataFrame can be done using toDF(), createDataFrame()

ColumnName = ["name","id"]
df_test= spark.createDataFrame(rdd, schema =ColumnName)
df_test.printSchema()
df_test.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- id: long (nullable = true)

+---------+---+
|name     |id |
+---------+---+
|Finance  |10 |
|Marketing|20 |
|Sales    |30 |
|IT       |40 |
+---------+---+



In [69]:
df_test.write\
    .format('jdbc')\
    .option("driver","com.mysql.jdbc.Driver") \
    .option("url", "jdbc:mysql://127.0.0.1:3306/spark_db") \
    .option("dbtable", "df_test") \
    .option("user", "analystdata") \
    .option("password", "pwd#90") \
    .save()