In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, monotonically_increasing_id, split, col, isnan, when, count
from pyspark.sql.types import StringType, StructType, DoubleType

# Assembly or download spark-excel and its dependencies
jars = [
    "C:\\Users\\Dsaad\\GitHub\\pyspark\\jars\\spark-hadoopoffice-ds_2.12-1.6.3.jar"    
]

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", ",".join(jars)) \
    .config("spark.driver.memory", "10g") \
    .getOrCreate()

spark

In [30]:
# create a schema to read the excel file
schema = StructType() \
        .add("c0",StringType(),True) \
        .add("c1",StringType(),True) \
        .add("c2",StringType(),True)

# read the excel file to fact DataFrame with custom schema
fact = spark.read.format('org.zuinnote.spark.office.excel')\
            .option("read.spark.simpleMode",True) \
            .schema(schema) \
            .load("./data/skuToGUID.xlsx")

fact = fact.drop(col('c0'))

# dropping all the Null and None values
fact = fact.na.drop(how='any' , subset=["c1","c2"])

# renaming columns
fact = fact.withColumnRenamed("c1","productGUID") \
           .withColumnRenamed("c2","SKU") \
           .withColumn("SKU",col("SKU").cast(StringType())) \
           .withColumn("productGUID",col("productGUID").cast(StringType()))

fact.show(10)

fact.printSchema()

# counting nulls
fact.select( [ count( when(isnan(c) | col(c).isNull(), c) ).alias(c) for c in fact.columns] ).show(truncate = True)

# create a temp view
fact.createOrReplaceTempView("fact")

+--------------------+----------+
|         productGUID|       SKU|
+--------------------+----------+
|        Product_GUID|       SKU|
|51D0242F-ADCC-4B7...|2608580315|
|B18557B0-E0FD-4A3...|   AT2550A|
|0A4DC340-167F-44B...|2609200213|
|69D084D2-1297-438...|2607224392|
|0820E32F-06A6-460...|2608608H66|
|45A90BED-BA15-4A9...|0600882174|
|45A90BED-BA15-4A9...|06008A6271|
|91765661-8B3C-4F1...|06019B2901|
|91765661-8B3C-4F1...|06019B2902|
+--------------------+----------+
only showing top 10 rows

root
 |-- productGUID: string (nullable = true)
 |-- SKU: string (nullable = true)

+-----------+---+
|productGUID|SKU|
+-----------+---+
|          0|  0|
+-----------+---+



In [32]:
# reading the 31 excel file containg the sales data
df2 = spark.read.format('org.zuinnote.spark.office.excel')\
    .option("read.spark.simpleMode", True) \
    .option("columnNameOfRowNumber", "RowID") \
    .load("./data/sales/")

# adding a RowID
df2 = df2.withColumn("RowID", monotonically_increasing_id())

# selecting the needed the columns
df2 = df2.select(["RowID",'c5','c6','c7','c8','c14','c15'])

# filtering the header
df2 = df2.filter( ( df2['c5'] != '' ) & ( df2['c6'] != '' ) & ( df2['c7'] != '' ) & ( df2['c8'] != '' ) )

# Dropping all the Null and None values
df2 = df2.na.drop(how='any' , subset=["c5","c6","c7","c8","c14","c15"])

# cleaning the quantity column and removing 'PCE'
df2 = df2.withColumn('quantity', split(df2['c14'], ' ').getItem(0))
df2 = df2.drop(col('c14'))

# renaming columns and casting types
df2 = df2.withColumnRenamed("c5","productGroupID") \
         .withColumnRenamed("c6","productGroupName") \
         .withColumnRenamed("c7","materialID") \
         .withColumnRenamed("c8","materialName") \
         .withColumnRenamed("c15","sales") \
         .withColumn("sales", col("sales").cast(DoubleType())) \
         .withColumn("quantity", col("quantity").cast(DoubleType()))

# Dropping all the Null and None values
df2 = df2.na.drop(how='any' , subset=["sales","quantity"])

# filtering the negative values
df2 = df2.filter( ( df2['sales'] >= 0 ) & ( df2['quantity'] >= 0 ) )

# remove all data which is attributed to the Product Group 175
sales = df2.filter( df2['productGroupID'] != 175) 

# showing the result
sales.show(10)

sales.printSchema()

# counting nulls
sales.select( [ count( when(isnan(c) | col(c).isNull(), c) ).alias(c) for c in sales.columns] ).show()

# create a temp view
sales.createOrReplaceTempView("sales")

+-----+--------------+--------------------+----------+--------------------+-----+--------+
|RowID|productGroupID|    productGroupName|materialID|        materialName|sales|quantity|
+-----+--------------+--------------------+----------+--------------------+-----+--------+
|    5|           005|Impact Drills < 500W|0603130004|      EasyImpact 550| 37.0|     1.0|
|    6|           005|Impact Drills < 500W|0603130004|      EasyImpact 550| 26.0|     1.0|
|    7|           005|Impact Drills < 500W|0603130020|      EasyImpact 550|159.0|     5.0|
|    8|           009|Rotary Hammers 1-2KG|06033A9302|PBH 2100 RE + drills|106.0|     2.0|
|    9|           009|Rotary Hammers 1-2KG|06033A9303|   PBH 2100 RE + set| 56.0|     1.0|
|   10|           009|Rotary Hammers 1-2KG|06033A9320|         PBH 2100 RE|233.0|     4.0|
|   11|           017|       Drill Drivers|060397290X|      EasyDrill 12-2|268.0|     4.0|
|   12|           017|       Drill Drivers|06039A210B|      EasyDrill 1200|110.0|     2.0|

In [33]:
spark.sql('''SELECT fact.productGUID
                   ,sales.materialName
                   ,sum(sales.quantity) as sumOfQuantity
                   ,sum(sales.sales) as sumOfSales
                FROM sales
                JOIN fact
                ON sales.materialID = fact.SKU
                GROUP BY fact.productGUID, sales.materialName
            ''').show(10)

+--------------------+-------------------+-------------+----------+
|         productGUID|       materialName|sumOfQuantity|sumOfSales|
+--------------------+-------------------+-------------+----------+
|5324F254-9A9C-475...|          PKS 66 AF|        181.0|   21883.0|
|99571ACA-CB1C-4B9...|     EasyImpact 500|       1062.0|   30766.0|
|DF3A1BA5-1FDC-474...|           ART 27 +|         37.0|    1887.0|
|9B0EE701-B7BF-4E3...|      PSB 1800 Li-2|        114.0|    8335.0|
|94B8DC22-EA8D-4F4...|          AXT 25 TC|        163.0|   53870.0|
|3F940372-813C-416...|            PTC 470|         48.0|    4272.0|
|7A0531CD-257B-475...|  AdvancedRotak 770|         19.0|    3746.0|
|86313E77-BE96-41D...|          ART 23-28|          5.0|      90.0|
|B671FB55-CEAB-42F...|    UniversalVac 18|        726.0|   32296.0|
|2A1C04BA-43D2-4F1...|EasyHedgeCut 12-450|         16.0|    1070.0|
+--------------------+-------------------+-------------+----------+
only showing top 10 rows



In [34]:
sales.join(fact, sales.materialID == fact.SKU, "inner") \
     .select(['productGUID' ,'materialName', 'quantity' ,'sales']) \
     .groupBy("productGUID","materialName") \
     .sum("quantity","sales") \
     .show()

+--------------------+-------------------+-------------+----------+
|         productGUID|       materialName|sum(quantity)|sum(sales)|
+--------------------+-------------------+-------------+----------+
|5324F254-9A9C-475...|          PKS 66 AF|        181.0|   21883.0|
|99571ACA-CB1C-4B9...|     EasyImpact 500|       1062.0|   30766.0|
|DF3A1BA5-1FDC-474...|           ART 27 +|         37.0|    1887.0|
|9B0EE701-B7BF-4E3...|      PSB 1800 Li-2|        114.0|    8335.0|
|94B8DC22-EA8D-4F4...|          AXT 25 TC|        163.0|   53870.0|
|3F940372-813C-416...|            PTC 470|         48.0|    4272.0|
|7A0531CD-257B-475...|  AdvancedRotak 770|         19.0|    3746.0|
|86313E77-BE96-41D...|          ART 23-28|          5.0|      90.0|
|B671FB55-CEAB-42F...|    UniversalVac 18|        726.0|   32296.0|
|2A1C04BA-43D2-4F1...|EasyHedgeCut 12-450|         16.0|    1070.0|
|A37AFACE-3433-40C...|       AHS 48-20 LI|        106.0|   11014.0|
|83797DF3-15FF-4F2...|     AXT RAPID 2000|      