In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os

In [3]:
#set Java home
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-18.0.2.1"

In [4]:
#start spark session
conf = SparkConf() \
    .setAppName("Example") \
    .setMaster("local[*]") \
    .set("spark.driver.extraClassPath","G:/pyspark/*") \
    .set('spark.executor.extraClassPath', 'G:/pyspark/*')

In [None]:
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
spark

In [None]:
database = "AdventureWorksDW2019" #read data
table = "DimProduct"
password = os.environ['PGPASS']
user = os.environ['PGUID']
schema  = "dbo"

In [None]:
jdbc_url = f'jdbc:sqlserver://localhost:1433;database={database};encrypt=true;trustServerCertificate=true;'

In [None]:
df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", f"{schema}.{table}") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

In [None]:
df.show()

In [None]:
df.limit(10).toPandas()

In [None]:
#rename column
df = df.withColumnRenamed("EnglishProductName","ProductName")
df.show()

In [None]:
df.count()

In [None]:
#Select subset of columns
df = df.select("ProductKey", "ProductName", "Color")
df.show()

In [None]:
df.sort("ProductName").show()

In [None]:
#descending Sort Order
from pyspark.sql import functions as F
df.sort(F.desc("ProductName")).show()

In [None]:
df.printSchema()

In [None]:
df.select("ProductKey", "ProductName").filter("ProductKey = 22").show() #dataframe filter operation

In [None]:
df.select(df.ProductKey, df.ProductName).filter("ProductKey = 22").show()

In [None]:
df.select("ProductKey", "ProductName").filter("ProductName like '%helmet%'").show()

In [None]:
df.filter((df.ProductName.like('%helmet%')) & (df.Color=='Black')).show()

In [None]:
#sql filter
df.createOrReplaceTempView("Product")
spark.sql("select count(1) from Product").show()

In [None]:
spark.sql("select ProductKey, ProductName from Product where ProductKey = 22").show()

In [None]:
spark.sql("select ProductKey, ProductName from Product where ProductName like '%helmet%'").show()

In [None]:
#get the fact table with product sales transactions #create sales view
sales = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", f"dbo.FactInternetSales") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

In [None]:
sales.createOrReplaceTempView("sales")
sales.cache()

In [None]:
#join product and sales
spark.sql("""
SELECT 
    p.ProductName,
    SUM(s.SalesAmount) AS SalesAmount
FROM  Product p
    Inner join sales s on p.ProductKey = s.ProductKey
where ProductName like '%helmet%'
Group by 
    p.ProductName
order by 
    SUM(s.SalesAmount) desc"""
).show()

In [None]:
#join 2 df
salesjoined = sales.join(df, ['ProductKey'],how='inner')
salesjoined.limit(10).toPandas()

In [None]:
#grouby usinf column alias
salesjoined.groupBy(["ProductName","Color"]).agg(
    F.sum("SalesAmount").alias("TotalSalesAmounted"),\
    F.max("SalesAmount").alias("MaxSalesAmount")\
    ).show()

In [None]:
saleswithNet = salesjoined.withColumn("NetSales", F.col("SalesAmount") - F.col("TaxAmt"))
saleswithNet.limit(10).toPandas()

In [None]:
#Create a new column based on a condition-region
saleswithNet = saleswithNet.withColumn(
    'Region',
    F.when((F.col("SalesTerritoryKey") == 7), "Europe")\
    .when((F.col("SalesTerritoryKey") == 8) , "Europe")\
    .when((F.col("SalesTerritoryKey") == 9) , "Pacific")\
    .when((F.col("SalesTerritoryKey") == 10) , "Europe")\
    .otherwise("Americas")
)
saleswithNet.limit(10).toPandas()

In [None]:
#check distinct values
saleswithNet.select('Region').distinct().collect()

In [None]:
#replace null with 0
from pyspark.sql.functions import when, lit
saleswithNet = saleswithNet.withColumn('CarrierTrackingNumber', when(saleswithNet.CarrierTrackingNumber.isNull(), 
lit('0')).otherwise(saleswithNet.CarrierTrackingNumber))
saleswithNet.limit(10).toPandas()

In [None]:
saleswithNet=saleswithNet.drop("CustomerPONumber") #drop col
saleswithNet.limit(10).toPandas()