**To leverage Spark SQL API to make SQL queries**:

I defined two functions: one is spark_df_reader to connect to the MySQL database through jdbc Driver and read the tables as a dictionary called dfs whose keys are the names of the tables and the values are the corresponding dataframes. The second function is temporary_view_registrator to register the Spark dataframes (one per table in the database) as a temporary view to be able to pass in direct SQL queries.


In [1]:
import findspark
findspark.init('/home/danial/spark-3.4.0-bin-hadoop3')
import pyspark 
import os
password = os.environ.get('MYSQL_PASSWORD')

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MySQL Session").getOrCreate()

23/07/28 21:00:23 WARN Utils: Your hostname, danial-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/07/28 21:00:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/28 21:00:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def spark_df_reader(database_name, table_names ):
    
    # table_names is a list of table names in the database that I want to connect to 
    
    mysql_url = f"jdbc:mysql://localhost:3306/{database_name}"
    
    mysql_properties = {
    "user": "root",
    "password": password,
    "driver": "com.mysql.jdbc.Driver"
    }
    
    dfs = {}
    for one_table in table_names:
        
        df = spark.read.jdbc(url=mysql_url, table=one_table, properties=mysql_properties)

        dfs[one_table] = df
        
    return dfs 

In [4]:
# I need to register my Spark dataframes (one per table in the database) as a temporary view to be able to pass in direct SQL queries 

def temporary_view_registrator(dfs):
    
    # dfs is a dictionary whose keys are the name of tables and values are the corresponding dfs
    
    tem_views = []
    for one_table in list(dfs.keys()):
        dfs[one_table].createOrReplaceTempView(f"{one_table}")
        tem_views.append(f"{one_table}")
        
    return None # this functions returns nothing but creates temporary views with the same name as the tables

### Leetcode Question 1757: Recyclable and Low Fat Products

In [5]:
dfs = spark_df_reader('Leetcode_Q_1757', ['Products'])

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.


In [6]:
dfs['Products'].show()

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+----------+--------+----------+
|product_id|low_fats|recyclable|
+----------+--------+----------+
|         0|       Y|         N|
|         1|       Y|         Y|
|         2|       N|         Y|
|         3|       Y|         Y|
|         4|       N|         N|
+----------+--------+----------+



In [31]:
temporary_view_registrator(dfs)

In [32]:
spark.sql("""
SELECT *
FROM Products
""").show()

+----------+--------+----------+
|product_id|low_fats|recyclable|
+----------+--------+----------+
|         0|       Y|         N|
|         1|       Y|         Y|
|         2|       N|         Y|
|         3|       Y|         Y|
|         4|       N|         N|
+----------+--------+----------+



In [4]:
dfs = spark_df_reader('Leetcode_Q_1045', ['Customer', 'Product'])

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.


In [6]:
dfs['Customer'].show()

+-----------+-----------+
|customer_id|product_key|
+-----------+-----------+
|          1|          5|
|          2|          6|
|          3|          5|
|          3|          6|
|          1|          6|
+-----------+-----------+



In [7]:
dfs['Product'].show()

+-----------+
|product_key|
+-----------+
|          5|
|          6|
+-----------+



In [11]:
temporary_view_registrator(dfs)

In [18]:
query1 = "SELECT * FROM Customer"
result1 = spark.sql(query1)
result1.show()

+-----------+-----------+
|customer_id|product_key|
+-----------+-----------+
|          1|          5|
|          2|          6|
|          3|          5|
|          3|          6|
|          1|          6|
+-----------+-----------+



In [19]:
query1 = "SELECT * FROM Product"
result1 = spark.sql(query1)
result1.show()

+-----------+
|product_key|
+-----------+
|          5|
|          6|
+-----------+



In [29]:
spark.sql("""
SELECT c.customer_id, c.product_key
FROM Customer c
JOIN Product p 
    USING (product_key)
ORDER BY c.customer_id
""").show()

+-----------+-----------+
|customer_id|product_key|
+-----------+-----------+
|          1|          5|
|          1|          6|
|          2|          6|
|          3|          5|
|          3|          6|
+-----------+-----------+

