In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local").appName("PySpark_MySQL_test").getOrCreate()

In [3]:
spark

In [4]:
import findspark
findspark.init()

In [5]:
import os
# import path
mysql_jar_path = os.path.join("driver", "mysql-connector-j-8.2.0.jar")
spark = SparkSession.builder \
    .appName("PySpark MySQL Connection") \
    .config("spark.jars", mysql_jar_path) \
    .getOrCreate()

In [6]:
url = "jdbc:mysql://127.0.0.1:3306/adventureworks"
properties = {
    "user": "root",
    "password": "pass123",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [11]:
employee_df = spark.read.format("jdbc").option("url", url).option("dbtable", "employee").options(**properties).load()
product_df = spark.read.format("jdbc").option("url", url).option("dbtable", "product").options(**properties).load()
customer_df = spark.read.format("jdbc").option("url", url).option("dbtable", "customer").options(**properties).load()
salesorderheader_df = spark.read.format("jdbc").option("url", url).option("dbtable", "salesorderheader").options(**properties).load()
salesorderdetail_df = spark.read.format("jdbc").option("url", url).option("dbtable", "salesorderdetail").options(**properties).load()

In [14]:
print("Employee Schema")
employee_df.printSchema()

print("Product Schema")
product_df.printSchema()

print("Customer Schema")
customer_df.printSchema()

print("Sales Header Schema")
salesorderheader_df.printSchema()

print("Sales Detail Schema")
salesorderdetail_df.printSchema()

Employee Schema
root
 |-- EmployeeID: integer (nullable = true)
 |-- NationalIDNumber: string (nullable = true)
 |-- ContactID: integer (nullable = true)
 |-- LoginID: string (nullable = true)
 |-- ManagerID: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- BirthDate: timestamp (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HireDate: timestamp (nullable = true)
 |-- SalariedFlag: boolean (nullable = true)
 |-- VacationHours: integer (nullable = true)
 |-- SickLeaveHours: integer (nullable = true)
 |-- CurrentFlag: boolean (nullable = true)
 |-- rowguid: binary (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)

Product Schema
root
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: boolean (nullable = true)
 |-- FinishedGoodsFlag: boolean (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLe

In [21]:
product_sort_by_list_price = product_df.select("ProductID", "Name", "Color").orderBy("ListPrice")
product_sort_by_list_price.show()

+---------+--------------------+------+
|ProductID|                Name| Color|
+---------+--------------------+------+
|        1|     Adjustable Race|  null|
|        2|        Bearing Ball|  null|
|        3|     BB Ball Bearing|  null|
|        4|Headset Ball Bear...|  null|
|      316|               Blade|  null|
|      317|         LL Crankarm| Black|
|      318|         ML Crankarm| Black|
|      319|         HL Crankarm| Black|
|      320|     Chainring Bolts|Silver|
|      321|       Chainring Nut|Silver|
|      322|           Chainring| Black|
|      323|          Crown Race|  null|
|      324|         Chain Stays|  null|
|      325|             Decal 1|  null|
|      326|             Decal 2|  null|
|      327|           Down Tube|  null|
|      328|   Mountain End Caps|  null|
|      329|       Road End Caps|  null|
|      330|    Touring End Caps|  null|
|      331|            Fork End|  null|
+---------+--------------------+------+
only showing top 20 rows



0. From the following tables write a SQL query to get all product names and sales order IDs. Order the result set on product name column.

In [35]:
# creating a temp view from product df
product_view = product_df.createOrReplaceTempView("product")
salesorderheader = salesorderheader_df.createOrReplaceTempView("salesorderheader")
salesorderdetail = salesorderdetail_df.createOrReplaceTempView("salesorderdetail")

In [38]:
result = spark.sql("""
    SELECT
        p.Name,
        soh.SalesOrderID
    FROM salesorderheader soh
    inner join salesorderdetail sod on soh.SalesOrderID = sod.SalesOrderID
    inner join product p on sod.ProductID = p.ProductID
""")
result.count()

121317