### Performing the Extract and Transform portion using PySpark, quering the data with Spark and SQL then loading to a Postgres Database

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os

In [3]:
os.environ["JAVA_HOME"] ="C:\Program Files\Java\jdk-17"

In [4]:
conf = SparkConf() \
    .setAppName("Example") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath", "C:/pyspark/*")
#
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [5]:
df = spark.read.options(delimiter=",", header=True).csv(
    r"C:\Users\lamarwells\Downloads\AdvWorksData.csv")
df.show()

+---------------+------------------+--------------------+--------------+--------------+------+-------------+----------------+-----------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+----------+-------------+---------+
|productcategory|productsubcategory|             product| saleterritory|       Country|  City|         Sate|        Customer|   Employee|OrderCount|OrderDate|StandardCost|UnitPrice|UnitPriceDiscount|Discount|ListPrice|SaleswithStandard|  NetSales|OrderQuantity|    Sales|
+---------------+------------------+--------------------+--------------+--------------+------+-------------+----------------+-----------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+----------+-------------+---------+
|       Clothing|              Caps|        AWC Logo Cap|United Kingdom|United Kingdom| Berks|      England|      Gary Suess|Amy Alberts|         1|  00:00.0|      6.9223|   5.0136|   

In [11]:
df.printSchema()


root
 |-- productcategory: string (nullable = true)
 |-- productsubcategory: string (nullable = true)
 |-- product: string (nullable = true)
 |-- saleterritory: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Sate: string (nullable = true)
 |-- Customer: string (nullable = true)
 |-- Employee: string (nullable = true)
 |-- OrderCount: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- StandardCost: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- UnitPriceDiscount: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- ListPrice: string (nullable = true)
 |-- SaleswithStandard: string (nullable = true)
 |-- NetSales: string (nullable = true)
 |-- OrderQuantity: string (nullable = true)
 |-- Sales: string (nullable = true)



In [13]:
print("Number of Rows:", df.count())
print("Number of Columns:", len(df.columns))

Number of Rows: 60880
Number of Columns: 20


In [14]:
france = df.filter(df.saleterritory == "France").show(truncate=False)

+---------------+------------------+------------------------------+-------------+-------+--------+--------------+----------------+-----------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+--------+-------------+---------+
|productcategory|productsubcategory|product                       |saleterritory|Country|City    |Sate          |Customer        |Employee   |OrderCount|OrderDate|StandardCost|UnitPrice|UnitPriceDiscount|Discount|ListPrice|SaleswithStandard|NetSales|OrderQuantity|Sales    |
+---------------+------------------+------------------------------+-------------+-------+--------+--------------+----------------+-----------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+--------+-------------+---------+
|Components     |Handlebars        |HL Mountain Handlebars        |France       |France |Paris   |Seine (Paris) |Ty Loren Carlson|Amy Alberts|1         |00:00.0  |53.3999     

In [16]:
df1 = df[['productcategory', 'saleterritory', 'OrderDate', 'Sales']]
df1.show()

+---------------+--------------+---------+---------+
|productcategory| saleterritory|OrderDate|    Sales|
+---------------+--------------+---------+---------+
|       Clothing|United Kingdom|  00:00.0|68.786592|
|    Accessories|United Kingdom|  00:00.0|       90|
|       Clothing|United Kingdom|  00:00.0|  182.352|
|       Clothing|United Kingdom|  00:00.0| 317.5964|
|       Clothing|United Kingdom|  00:00.0|  159.558|
|       Clothing|United Kingdom|  00:00.0|   45.588|
|       Clothing|United Kingdom|  00:00.0|   22.794|
|       Clothing|United Kingdom|  00:00.0|   22.794|
|       Clothing|United Kingdom|  00:00.0|  42.3867|
|       Clothing|United Kingdom|  00:00.0| 113.0312|
|       Clothing|United Kingdom|  00:00.0|  42.3867|
|     Components|United Kingdom|  00:00.0|  826.164|
|     Components|United Kingdom|  00:00.0|  149.676|
|     Components|United Kingdom|  00:00.0| 1472.291|
|     Components|United Kingdom|  00:00.0| 736.1455|
|     Components|United Kingdom|  00:00.0| 744

In [17]:
salesterritory = df.groupBy('saleterritory').count()


In [18]:
print(salesterritory.show())

+--------------+-----+
| saleterritory|count|
+--------------+-----+
|       Germany| 1864|
|        France| 3530|
|     Northwest| 7872|
|     Southeast| 5937|
|       Central| 5812|
|        Canada|11444|
|     Southwest|13379|
|     Australia| 1713|
|United Kingdom| 3520|
|     Northeast| 5809|
+--------------+-----+

None


In [19]:
df.createOrReplaceTempView("sales")
output = spark.sql("SELECT * from sales where productsubcategory='Caps'")
output.show()

+---------------+------------------+------------+--------------+--------------+------------+------------+----------------+--------------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+----------+-------------+---------+
|productcategory|productsubcategory|     product| saleterritory|       Country|        City|        Sate|        Customer|      Employee|OrderCount|OrderDate|StandardCost|UnitPrice|UnitPriceDiscount|Discount|ListPrice|SaleswithStandard|  NetSales|OrderQuantity|    Sales|
+---------------+------------------+------------+--------------+--------------+------------+------------+----------------+--------------+----------+---------+------------+---------+-----------------+--------+---------+-----------------+----------+-------------+---------+
|       Clothing|              Caps|AWC Logo Cap|United Kingdom|United Kingdom|       Berks|     England|      Gary Suess|   Amy Alberts|         1|  00:00.0|      6.9223|   5.0136|   

In [22]:
dest_tbl = 'public."pyspark_sales_table"'
database = "AdventureWorks"
password = "password"
user = "user"
#
df.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://localhost:5432/{database}") \
    .option("dbtable", dest_tbl) \
    .option("user", user) \
    .option("password", password) \
    .option("driver",  "org.postgresql.Driver") \
    

<pyspark.sql.readwriter.DataFrameWriter at 0x2064f34f4f0>