In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id, row_number,min
import os
import pandas

In [7]:
file_path = 'input-data-set/Inventory_Management_Src1.csv'

# input_file_path = os.path.join(script_dir, relative_path)

# Initialize Spark session



In [8]:
spark = SparkSession.builder \
    .appName("Star Schema") \
    .getOrCreate()



In [4]:

spark.sparkContext.setLogLevel("ERROR")

In [44]:


try:
    df = spark.read.csv(file_path, header=True, inferSchema=True)
except Exception as e:
    print("an error was occured reading the main input file " )
    print( "ERROR: " , e)
    

In [46]:
# define output folder and handle the case when output does not exist

output_dir = "output"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

In [57]:
#  Customers Dimension Table
try:
    customers_df = df.select("CUSTOMER_ID", "Customer Name", "CUSTOMER_LOGIN_ID", 
                         "CUSTOMER_STREET_ADDRESS", "CUSTOMER_CITY", "CUSTOMER_STATE", 
                         "CUSTOMER_ZIP", "CUSTOMER_PHONE_NO").dropDuplicates()
    
    windowSpec = Window.orderBy(monotonically_increasing_id())
    customers_dim = customers_df.withColumn("CUSTOMER_DIM_ID", row_number().over(windowSpec))
    
    # show Customers Dimension table
    customers_dim.show() 
except Exception as e:
    print("an error was occured while creating cutomers dimension table " )
    print( "ERROR: " , e)
    



+-----------+------------------+-----------------+-----------------------+-------------+--------------+------------+-----------------+---------------+
|CUSTOMER_ID|     Customer Name|CUSTOMER_LOGIN_ID|CUSTOMER_STREET_ADDRESS|CUSTOMER_CITY|CUSTOMER_STATE|CUSTOMER_ZIP|CUSTOMER_PHONE_NO|CUSTOMER_DIM_ID|
+-----------+------------------+-----------------+-----------------------+-------------+--------------+------------+-----------------+---------------+
|     100078|     Caldwell Wade|             5078|   216-4377 Semper, St.|     COLUMBUS|          OHIO|       94154|     027-779-9608|              1|
|     100003|       Fritz Grant|             5003|   Ap #897-7736 Eges...| PHILADELPHIA|  PENNSYLVANIA|       65342|     029-197-3614|              2|
|     100082|    Graiden Oneill|             5082|   P.O. Box 708, 751...|       BOSTON|MASSAACHUSETTS|       95485|     011-485-6145|              3|
|     100040|       Caryn Doyle|             5040|      3258 Massa Avenue|     NEW YORK|      

In [51]:
# changing to pandas (to save file in csv)
try:
    customers_dim_pandas = customers_dim.toPandas()
    
    # handling the path not found errors 
    
    
    
    # Now save to CSV in the 'output' folder
    customers_dim_pandas.to_csv(f"{output_dir}/customers_dim.csv", index=False)
except Exception as e:
    print("an error was occured while writing(saving) cutomers dimension table " )
    print( "ERROR: " , e)
    

In [56]:
# product dimension table 

try:

    products_df = df.select("PRODUCT_ID", "CATEGORY_ID", "PRODUCT Name", "PRODUCT Brand", 
                            "Product Model No", "PRODUCT_STOCK").dropDuplicates()
    
    products_df = df.groupby("PRODUCT_ID", "CATEGORY_ID", "PRODUCT Name", "PRODUCT Brand", 
                            "Product Model No").agg(min(col("PRODUCT_STOCK")).alias("PRODUCT_STOCK"))
    
    products_dim = products_df.withColumn("PRODUCT_DIM_ID", row_number().over(windowSpec))
    
    # show  Products Dimension table
    products_dim.show()
    
except Exception as e:
    print("an error was occured while creating product dimension table " )
    print( "ERROR: " , e)
    


+----------+-----------+------------+-------------+----------------+-------------+--------------+
|PRODUCT_ID|CATEGORY_ID|PRODUCT Name|PRODUCT Brand|Product Model No|PRODUCT_STOCK|PRODUCT_DIM_ID|
+----------+-----------+------------+-------------+----------------+-------------+--------------+
|      3005|       4005|      LAPTOP|           HP|            4620|            1|             1|
|      3093|       4005|      LAPTOP|      TOSHIBA|            4978|            1|             2|
|      3047|       4004|  TELEVISION|      PHILIPS|            4500|            0|             3|
|      3073|       4006|    PENDRIVE|      SANDISK|            9457|            1|             4|
|      3033|       4007|   EARPHONES|         SONY|           11456|            1|             5|
|      3022|       4004|  TELEVISION|           LG|            4950|            1|             6|
|      3094|       4006|    PENDRIVE|     KINGSTON|            9965|            0|             7|
|      3074|       4

In [53]:
#changing to pandas and saving as csv file
try:
    products_dim_pandas = products_dim.toPandas()
    products_dim_pandas.to_csv(f"{output_dir}/products_dim.csv", index=False)
    
except Exception as e:
    print("an error was occured while saving(writing) product dimension table " )
    print( "ERROR: " , e)
    

In [55]:
# Create Seller Dimension Table
try:
    sellers_df = df.select("SELLER_ID", "SELLER_NAME", "SELLER_RATING", "SELLER_STREET_ADDRESS", 
                           "SELLER_CITY", "SELLER_STATE", "SELLER_ZIP", "SELLER_PHONE_NO").dropDuplicates()
    
    sellers_dim = sellers_df.withColumn("SELLER_DIM_ID", row_number().over(windowSpec))
    
    # show Seller Dimension table
    sellers_dim.show()  # write.csv("output/sellers_dim", header=True)

except Exception as e:
    print("an error was occured while creating seller dimension table " )
    print( "ERROR: " , e)
    

+---------+----------------+-------------+---------------------+-----------+--------------------+----------+---------------+-------------+
|SELLER_ID|     SELLER_NAME|SELLER_RATING|SELLER_STREET_ADDRESS|SELLER_CITY|        SELLER_STATE|SELLER_ZIP|SELLER_PHONE_NO|SELLER_DIM_ID|
+---------+----------------+-------------+---------------------+-----------+--------------------+----------+---------------+-------------+
|   200046|    Leroy Conner|            5| P.O. Box 965, 585...|      BREST|            BRITTANY|     76254| 08 90 52 61 56|            1|
|   200020|   Basia Nielsen|            4| Ap #755-3390 Semp...|      TOURS|              CENTRE|     80874| 03 11 73 89 94|            2|
|   200091|    Leo Atkinson|            4|  134-8687 Primis St.|      BREST|            BRITTANY|     81059| 06 56 07 84 00|            3|
|   200089|  Deborah Wooten|            5|      7708 Lorem, Rd.|       LYON|         RHONE-ALPES|     58575| 01 15 87 00 65|            4|
|   200011|   Jescie Kelley

In [1]:
#creating pandas and saving it 
try:
    sellers_dim_pandas = sellers_dim.toPandas()
    
    sellers_dim_pandas.to_csv(f"{output_dir}/sellers_dim.csv", index=False)

except Exception as e:
    print("an error was occured while writing  seller dimension table " )
    print( "ERROR: " , e)
    

an error was occured while writing  seller dimension table 
ERROR:  name 'sellers_dim' is not defined


In [2]:

# Create Time Dimension 

try:
    time_df = df.select("Date").dropDuplicates()
    
    # Extract Year, Quarter, Month, Day_of_Week
    from pyspark.sql.functions import  to_date, year, quarter, month, dayofweek
    
    time_dim = time_df.withColumn("YEAR", year(to_date(col("Date"),"dd-MMM-yy"))) \
                      .withColumn("QUARTER", quarter(to_date(col("Date"),"dd-MMM-yy"))) \
                      .withColumn("MONTH", month(to_date(col("Date"),"dd-MMM-yy"))) \
                      .withColumn("DAY_OF_WEEK", dayofweek(to_date(col("Date"),"dd-MMM-yy"))) \
                      .withColumn("TIME_DIM_ID", row_number().over(windowSpec))
    
                      
    # abc =  year(to_date("28-aug-11", "dd-mmm-yy"))
    # print(abc)
    
    
    time_dim.select.show()  #write.csv("output/time_dim", header=True)

except Exception as e:
    print("an error was occured while creating Date dimension table " )
    print( "ERROR: " , e)


an error was occured while creating Date dimension table 
ERROR:  name 'df' is not defined


In [4]:
try:
    time_dim_pandas = time_dim.toPandas()
    time_dim_pandas.to_csv(f"{output_dir}/time_dim.csv", index=False)
except Exception as e:
    print("an error was occured while saving time dimension table " )
    print( "ERROR: " , e)

an error was occured while saving time dimension table 
ERROR:  name 'time_dim' is not defined


In [11]:
#  Transaction  Table

try:

    transaction_df = df.select("TRANSACTION_ID", "TRANSACTION_DATE", "TRANSACTION_AMOUNT", 
                               "TRANSACTION_TYPE", "DISPATCH_DATE", "EXPECTED_DATE", "DELIVERY_DATE").dropDuplicates()
    
    transaction_dim = transaction_df.withColumn("TRANSACTION_DIM_ID", row_number().over(windowSpec))
    
    
    transaction_dim.show() 
    
except Exception as e:
    print("an error was occured while creating transaction dimension table " )
    print( "ERROR: " , e)


+------------------+--------------+----------------+------------------+----------------+-------------+-------------+-------------+
|TRANSACTION_DIM_ID|TRANSACTION_ID|TRANSACTION_DATE|TRANSACTION_AMOUNT|TRANSACTION_TYPE|DISPATCH_DATE|EXPECTED_DATE|DELIVERY_DATE|
+------------------+--------------+----------------+------------------+----------------+-------------+-------------+-------------+
|                 1|          5223|       12-Aug-11|              1900|             COD|    13-Aug-11|    16-Aug-11|    16-Aug-11|
|                 2|          5330|       27-Nov-11|              2100|     NET BANKING|    28-Nov-11|    01-Dec-11|    01-Dec-11|
|                 3|          5341|       08-Dec-11|              1800|     CREDIT CARD|    09-Dec-11|    12-Dec-11|    12-Dec-11|
|                 4|          5741|       11-Jan-13|               600|             COD|    12-Jan-13|    15-Jan-13|    15-Jan-13|
|                 5|          5860|       11-May-13|              8500|          PA

In [5]:
try:
    transaction_dim_pandas   = transaction_dim.toPandas()
    
    transaction_dim_pandas.to_csv(f"{output_dir}/transaction_dim.csv", index=False)
    
except Exception as e:
    print("an error was occured while writing  trasaction dimension table " )
    print( "ERROR: " , e)



an error was occured while writing  trasaction dimension table 
ERROR:  name 'transaction_dim' is not defined


In [25]:

#  Fact Table (Inventory)
try:
    inventory_fact_df = df.select("DATE" ,"TRANSACTION_ID", "PRODUCT_ID" , "CUSTOMER_ID", 
                               "SELLER_ID","PRODUCT_COST_PRICE","PRODUCT_SELLING_PRICE").dropDuplicates()
    inventory_fact_df = inventory_fact_df.join(time_dim, "DATE", "inner")
    
    inventory_fact = inventory_fact_df.withColumn("FACT_ID", row_number().over(windowSpec))
    
    inventory_fact.show()

except Exception as e:
    print("an error was occured while creating inventory fact table " )
    print( "ERROR: " , e)



+-------+-----------+--------------+----------+-----------+---------+------------------+---------------------+
|FACT_ID|TIME_DIM_ID|TRANSACTION_ID|PRODUCT_ID|CUSTOMER_ID|SELLER_ID|PRODUCT_COST_PRICE|PRODUCT_SELLING_PRICE|
+-------+-----------+--------------+----------+-----------+---------+------------------+---------------------+
|      1|        917|          5282|      3026|     100098|   200071|             38000|                41000|
|      2|        278|          5303|      3058|     100031|   200043|              1200|                 1100|
|      3|        794|          5582|      3080|     100025|   200003|              1900|                 2000|
|      4|        683|          5031|      3093|     100021|   200019|             47000|                52000|
|      5|        829|          5676|      3003|     100011|   200042|              6500|                 6750|
|      6|        163|          5252|      3024|     100079|   200096|              6000|                 6200|
|

In [28]:
try:
    inventory_fact_pandas  = inventory_fact.select("FACT_ID", "TIME_DIM_ID", "TRANSACTION_ID", 
                          "PRODUCT_ID", "CUSTOMER_ID", "SELLER_ID", 
                          "PRODUCT_COST_PRICE", "PRODUCT_SELLING_PRICE") \
                          .toPandas()
    
    inventory_fact_pandas.to_csv(f"output/inventory_fact.csv", index=False)

except Exception as e:
    print("an error was occured while saving invenotry fact table " )
    print( "ERROR: " , e)

In [39]:
spark.stop()