# PEI-Task

In [None]:
import pyspark
from pyspark.sql.types import LongType, StringType, DateType, DoubleType
from pyspark.sql.functions import udf, to_date, col, to_timestamp, regexp_replace
from pyspark.sql import SparkSession
import re

spark = SparkSession.builder.appName("PEITask").getOrCreate()

# Install Maven library "com.crealytics:spark-excel_2.12:3.3.2_0.19.0" for spark 3.3

### Common functions

In [None]:
# Function to remove spaces and hyphen from column names
def clean_column_names(df):
    for col_name in df.columns:
        if ' ' in col_name or '-' in col_name:
            df = df.withColumnRenamed(col_name, col_name.strip().replace(' ', '_').replace('-', '_'))
    return df

In [None]:
# Test clean_column_names function for given dataframe
def test_clean_column_names(df, expected_columns, isPositivecase=True):
    if isPositivecase==True:
        assert df.columns == expected_columns, "Columns are not formatted correctly"
    else:
        assert df.columns != expected_columns, "Columns are not formatted correctly for negative case"

In [None]:
# Test given column are has expected data type in dataframe
def test_column_datatype(df_order, expected_schema, isPositivecase=True):

    for column, expected_dtype in expected_schema.items():
        if isPositivecase==True:
            assert df_order.schema[column].dataType == expected_dtype, f"Failed to cast {column} column to {expected_dtype}"
        else:
            assert df_order.schema[column].dataType != expected_dtype, f"Failed to cast {column} column to {expected_dtype} for negative case"

#Order

In [None]:

df_order = spark.read.option("multiline","true").json("dbfs:/FileStore/PEI_Files/Order.json")

df_order = clean_column_names(df_order)


# remove special chars from Price column
df_order = df_order.withColumn("Price", regexp_replace("Price", "[^0-9.]", "").cast(DoubleType()))

df_order = df_order.withColumn("Order_Date", to_date(to_timestamp(col("Order_Date"), "d/M/yyyy")))
df_order = df_order.withColumn("Ship_Date", to_date(to_timestamp(col("Ship_Date"), "d/M/yyyy")))
df_order = df_order.withColumn("Discount", col("Discount").cast(DoubleType()))
df_order = df_order.withColumn("Profit", col("Profit").cast(DoubleType()))
df_order = df_order.withColumn("Quantity", col("Quantity").cast(LongType()))
df_order = df_order.withColumn("Row_ID", col("Row_ID").cast(LongType()))


df_order.show(truncate=False)

+-----------+--------+----------+--------------+-------+---------------+-------+--------+------+----------+--------------+
|Customer_ID|Discount|Order_Date|Order_ID      |Price  |Product_ID     |Profit |Quantity|Row_ID|Ship_Date |Ship_Mode     |
+-----------+--------+----------+--------------+-------+---------------+-------+--------+------+----------+--------------+
|JK-15370   |0.3     |2016-08-21|CA-2016-122581|573.174|FUR-CH-10002961|63.686 |7       |1     |2016-08-25|Standard Class|
|BD-11320   |0.0     |2017-09-23|CA-2017-117485|291.96 |TEC-AC-10004659|102.186|4       |2     |2017-09-29|Standard Class|
|LB-16795   |0.7     |2016-10-06|US-2016-157490|17.0   |OFF-BI-10002824|-14.92 |4       |3     |2016-10-07|First Class   |
|KB-16315   |0.2     |2015-07-02|CA-2015-111703|15.552 |OFF-PA-10003349|5.6376 |3       |4     |2015-07-09|Standard Class|
|DO-13435   |0.2     |2014-10-03|CA-2014-108903|142.488|TEC-AC-10003023|-3.0   |3       |5     |2014-10-03|Same Day      |
|CB-12025   |0.0

Test Cases 

In [None]:
# Test 1: clean_column_names 
expected_columns = ['Customer_ID', 'Discount', 'Order_Date', 'Order_ID', 'Price', 'Product_ID', 'Profit',
                    'Quantity', 'Row_ID', 'Ship_Date', 'Ship_Mode']
test_clean_column_names(df_order, expected_columns)

# Test 1: clean_column_names negitive
expected_columns = ['Customer ID', 'Discount', 'Order-Date', 'Order ID', 'Price', 'Product ID', 'Profit',
                    'Quantity', 'Row-ID', 'Ship_Date', 'Ship_Mode']
test_clean_column_names(df_order, expected_columns, isPositivecase=False)

# Test 2: Cast data types
expected_schema = {
        "Price": DoubleType(),
        "Discount": DoubleType(),
        "Profit": DoubleType(),
        "Quantity": LongType(),
        "Row_ID": LongType(),
        "Order_Date": DateType(),
        "Ship_Date": DateType()
    }
test_column_datatype(df_order, expected_schema)

# Test 2: Cast data types negitive
expected_schema = {#wrong data type for Columns
        "Price": LongType(),
        "Discount": LongType()
    }
test_column_datatype(df_order, expected_schema, isPositivecase=False)

#Customer

In [None]:
# Formats the given customer name by removing non-alphabetic characters
def format_customer_name(customer_name):
    if not customer_name:
        return None

    # Remove all non-alphabetic characters between two lowercase alphabets
    cleaned_name = re.sub(r'([a-z])[^a-zA-Z]+([a-z])', r'\1\2', customer_name)
    
    # Remove all non-alphabetic characters between uppercase and lowercase alphabets
    cleaned_name = re.sub(r'([A-Z])[^a-zA-Z]+([a-z])', r'\1\2', cleaned_name)

    # Remove all non-alphabetic characters between a lowercase and an uppercase alphabet and add one white space in between
    cleaned_name = re.sub(r'([a-z])([^a-zA-Z]+)([A-Z])', r'\1 \3', cleaned_name)

    # Remove all leading non-alphabetic characters
    cleaned_name = re.sub(r'^[^a-zA-Z]+|[^a-zA-Z]+$', '', cleaned_name)

    # Return None if the cleaned name is empty or contains only whitespace
    if not cleaned_name.strip():
        return None

    return cleaned_name.strip()


# Formats the given phone number by removing non-numeric characters
def format_phone_number(phone_number):
    if not phone_number:
        return None

    # Convert the phone number to a string
    phone_number = str(phone_number)

    # Remove everything after 'x' (extensions)
    phone_number = phone_number.split('x')[0]

    # Remove non-numeric characters from the phone number
    numeric_phone_number = re.sub(r'\D', '', phone_number)

    # Remove leading '001'
    numeric_phone_number = re.sub(r'^001', '', numeric_phone_number)

    # Remove leading zeros
    numeric_phone_number = numeric_phone_number.lstrip('0')

    # Ensure the phone number is exactly 10 digits long
    if len(numeric_phone_number) != 10:
        return None

    return numeric_phone_number

In [None]:

df_customer = spark.read.format("com.crealytics.spark.excel").option("header", "true").option("inferSchema", "true").option("dataAddress", "Worksheet").load("dbfs:/FileStore/PEI_Files/Customer.xlsx")
df_customer = clean_column_names(df_customer)


# Call UDF to format Customer_Name
format_customer_name_udf = udf(format_customer_name, StringType())
df_customer = df_customer.withColumn("Customer_Name", format_customer_name_udf("Customer_Name"))

# Call UDF to format Phone number
format_phone_number_udf = udf(format_phone_number, StringType())
df_customer = df_customer.withColumn("Phone", format_phone_number_udf("Phone"))

Test Cases

In [None]:
# Test 1: clean_column_names
expected_columns = ['Customer_ID', 'Customer_Name', 'email', 'Phone', 'address', 'Segment', 'Country', 'City', 'State', 'Postal_Code', 'Region']
test_clean_column_names(df_customer, expected_columns)

# Test 1: clean_column_names Negative
expected_columns = ['Customer ID', 'Customer-Name', 'email', 'Phone', 'address', 'Segment', 'Country', 'City', 'State', 'Postal_Code', 'Region']
test_clean_column_names(df_customer, expected_columns, isPositivecase=False)

# columns data types are not tested as there is no data casting in customer dataframe

df_customer.show(truncate=False)

+-----------+------------------+-----------------------------+----------+----------------------------------------------------+-----------+-------------+----------------+------------+-----------+-------+
|Customer_ID|Customer_Name     |email                        |Phone     |address                                             |Segment    |Country      |City            |State       |Postal_Code|Region |
+-----------+------------------+-----------------------------+----------+----------------------------------------------------+-----------+-------------+----------------+------------+-----------+-------+
|PW-19240   |Pierre Wener      |bettysullivan808@gmail.com   |4215800902|001 Jones Ridges Suite 338\nJohnsonfort, FL 95462   |Consumer   |United States|Louisville      |Colorado    |80027.0    |West   |
|GH-14410   |Gary Hansen       |austindyer948@gmail.com      |5424150246|00347 Murphy Unions\nAshleyton, IA 29814            |Home Office|United States|Chicago         |Illinois    |60653.

#Product

In [None]:
us_states = {
    'alabama', 'alaska', 'arizona', 'arkansas', 'california', 'colorado', 'connecticut', 'delaware', 'florida', 'georgia', 'hawaii', 'idaho', 'illinois', 'indiana', 'iowa', 'kansas', 'kentucky', 'louisiana', 'maine', 'maryland', 'massachusetts', 'michigan', 'minnesota', 'mississippi', 'missouri', 'montana', 'nebraska', 'nevada', 'new hampshire', 'new jersey', 'new mexico', 'new york', 'north carolina', 'north dakota', 'ohio', 'oklahoma', 'oregon', 'pennsylvania', 'rhode island', 'south carolina', 'south dakota', 'tennessee', 'texas', 'utah', 'vermont', 'virginia', 'washington', 'west virginia', 'wisconsin', 'wyoming'
    }

# Function to check if state exists and valid
def format_states(state):   
    if state.lower() in us_states:
        return state
    return None

In [None]:


df_product = spark.read.option("delimiter",",").csv("dbfs:/FileStore/PEI_Files/Product.csv", header=True)
df_product = clean_column_names(df_product)


# Call UDF to format Customer_Name
format_states_udf = udf(format_states, StringType())
df_product = df_product.withColumn("State", format_states_udf("State"))

df_product = df_product.withColumn("Price_per_product", col("Price_per_product").cast(DoubleType()))


df_product.show(truncate=False)

+---------------+---------------+------------+---------------------------------------------------------------+------------+-----------------+
|Product_ID     |Category       |Sub_Category|Product_Name                                                   |State       |Price_per_product|
+---------------+---------------+------------+---------------------------------------------------------------+------------+-----------------+
|FUR-CH-10002961|Furniture      |Chairs      |Leather Task Chair, Black                                      |New York    |81.882           |
|TEC-AC-10004659|Technology     |Accessories |Imation Secure+ Hardware Encrypted USB 2.0 Flash Drive; 16GB   |Oklahoma    |72.99            |
|OFF-BI-10002824|Office Supplies|Binders     |Recycled Easel Ring Binders                                    |Colorado    |4.25             |
|OFF-PA-10003349|Office Supplies|Paper       |Xerox 1957                                                     |Florida     |5.184            |
|TEC-A

Test Cases

In [None]:
# Test 1: clean_column_names
expected_columns = ['Product_ID', 'Category', 'Sub_Category', 'Product_Name', 'State', 'Price_per_product']
test_clean_column_names(df_product, expected_columns)

# Test 1: clean_column_names Negative
expected_columns = ['Product ID', 'Category', 'Sub Category', 'Product-Name', 'State', 'Price_per-product']
test_clean_column_names(df_product, expected_columns, isPositivecase=False)

# Test 2: Cast column to DoubleType
expected_schema = {
        "Price_per_product": DoubleType()
    }
test_column_datatype(df_product, expected_schema)


# Test 2: Cast column to LongType Negative
expected_schema = {
        "Price_per_product": LongType()
    }
test_column_datatype(df_product, expected_schema, isPositivecase=False)

#Question:1 Create raw tables for each source dataset

In [None]:
def test_save_as_table(df_order, df_customer, df_product):    
    # Save dataframes as Delta tables
    df_order.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("Order")
    df_customer.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("Customer")
    df_product.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("Product")
    
    # Check if the tables are created
    assert spark._jsparkSession.catalog().tableExists("Order")
    assert spark._jsparkSession.catalog().tableExists("Customer")
    assert spark._jsparkSession.catalog().tableExists("Product")
    
    # Check the count of the tables
    assert spark.sql("SELECT COUNT(1) FROM Order").first()[0] == df_order.count()
    assert spark.sql("SELECT COUNT(1) FROM Customer").first()[0] == df_customer.count()
    assert spark.sql("SELECT COUNT(1) FROM Product").first()[0] == df_product.count()

    
# Run the test function
test_save_as_table(df_order, df_customer, df_product)


###check duplicates in Raw Data

Order

In [None]:
%sql
select Order_ID,Product_ID, count(1) item_count from order
          GROUP BY Order_ID,Product_ID
          HAVING item_count > 1
          order by item_count desc
-- my understanding is that I can identify a row uniquely using Order_ID and Product_ID but in output there are 8 dulicate records          

Order_ID,Product_ID,item_count
CA-2017-152912,OFF-ST-10003208,2
CA-2015-103135,OFF-BI-10000069,2
US-2016-123750,TEC-AC-10004659,2
CA-2016-140571,OFF-PA-10001954,2
CA-2017-118017,TEC-AC-10002006,2
US-2014-150119,FUR-CH-10002965,2
CA-2016-137043,FUR-FU-10003664,2
CA-2016-129714,OFF-PA-10001970,2


In [None]:
%sql
SELECT * FROM Order where Order_ID = 'US-2016-123750' AND Product_ID = 'TEC-AC-10004659'
-- for the same Order_ID and Product_ID we have different quamtities

Customer_ID,Discount,Order_Date,Order_ID,Price,Product_ID,Profit,Quantity,Row_ID,Ship_Date,Ship_Mode
RB-19795,0.2,2016-04-15,US-2016-123750,291.96,TEC-AC-10004659,54.7425,5,1680,2016-04-21,Standard Class
RB-19795,0.2,2016-04-15,US-2016-123750,408.744,TEC-AC-10004659,76.6395,7,3434,2016-04-21,Standard Class


Customer

In [None]:
%sql
select Customer_ID, count(1) item_count from Customer
          GROUP BY Customer_ID
          HAVING item_count > 1
          order by item_count desc
-- no duplicates in Customer

Customer_ID,item_count


Product

In [None]:
%sql
select Product_ID, count(1) item_count from Product
          GROUP BY Product_ID
          HAVING item_count > 1
          order by item_count desc
-- we can see 33 products id's are dulicated

Product_ID,item_count
OFF-PA-10000477,2
FUR-FU-10004270,2
TEC-MA-10001148,2
OFF-BI-10004654,2
FUR-FU-10004017,2
OFF-PA-10001166,2
OFF-PA-10002195,2
OFF-AR-10001149,2
FUR-BO-10002213,2
TEC-PH-10004531,2


In [None]:
%sql
select * from Product where  Product_ID = 'OFF-BI-10002026'
-- here Product_ID is same but  Product_Name is different and when retriving exact product information using Product_ID will result in multiple rows for few ID's

Product_ID,Category,Sub_Category,Product_Name,State,Price_per_product
OFF-BI-10002026,Office Supplies,Binders,Ibico Recycled Linen-Style Covers,California,31.248
OFF-BI-10002026,Office Supplies,Binders,Avery Arch Ring Binders,California,46.48


#Question:2

In [None]:
df_enriched_customer_products = spark.sql(
    """
    Select 
    c.Customer_ID
    ,c.Customer_Name
    ,c.email
    ,c.phone
    ,c.address
    ,c.Segment
    ,c.Country
    ,c.City
    ,c.State as Customer_State
    ,c.Postal_Code
    ,c.Region
    ,p.Product_ID
    ,p.Category
    ,p.Sub_Category
    ,p.Product_Name
    ,p.State as Product_State
    ,p.Price_per_product
    -- ,o.Order_ID
    from customer as c
    join `order` as o
    join product as p
    on c.Customer_ID = o.Customer_ID
    and p.Product_ID = o.Product_ID
    """)

df_enriched_customer_products.show(truncate=False)

+-----------+------------------+--------------------------------+----------+----------------------------------------------------------+-----------+-------------+-------------+--------------+-----------+-------+---------------+---------------+------------+---------------------------------------------------------------+-------------+-----------------+
|Customer_ID|Customer_Name     |email                           |phone     |address                                                   |Segment    |Country      |City         |Customer_State|Postal_Code|Region |Product_ID     |Category       |Sub_Category|Product_Name                                                   |Product_State|Price_per_product|
+-----------+------------------+--------------------------------+----------+----------------------------------------------------------+-----------+-------------+-------------+--------------+-----------+-------+---------------+---------------+------------+-----------------------------------------

In [None]:
def test_enriched_customer_products_table():
    # Save dataframe as Delta table
    df_enriched_customer_products.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("enriched_customer_products")

    # Test case: check if the table exists
    table_exists = spark._jsparkSession.catalog().tableExists("enriched_customer_products")
    assert table_exists

    # Test case: check if the table has correct schema
    expected_schema = [
        'Customer_ID', 'Customer_Name', 'email', 'phone', 'address', 'Segment', 'Country', 'City', 'Customer_State',
        'Postal_Code', 'Region', 'Product_ID', 'Category', 'Sub_Category', 'Product_Name', 'Product_State',
        'Price_per_product'
    ]
    actual_schema = [field.name for field in spark.catalog.listColumns("enriched_customer_products")]
    assert actual_schema == expected_schema

    # Test case: check if the table has data
    row_count = spark.sql("SELECT COUNT(*) FROM enriched_customer_products").collect()[0][0]
    assert row_count > 0

test_enriched_customer_products_table()


#Question:3

In [None]:
df_enriched_order_customer_product = spark.sql(
    """
    Select 
    c.Customer_Name
    ,c.Country
    ,Round(o.Profit, 2) as Profit
    ,p.Category
    ,p.Sub_Category
    from `order` as o
    join customer as c
    join product as p
    on c.Customer_ID = o.Customer_ID
    and p.Product_ID = o.Product_ID
    """)
    
df_enriched_order_customer_product.show(truncate=False)

+------------------+-------------+------+---------------+------------+
|Customer_Name     |Country      |Profit|Category       |Sub_Category|
+------------------+-------------+------+---------------+------------+
|Jay Kimmel        |United States|63.69 |Furniture      |Chairs      |
|Jay Kimmel        |United States|63.69 |Furniture      |Chairs      |
|Bil Donatelli     |United States|102.19|Technology     |Accessories |
|Laurel Beltran    |United States|-14.92|Office Supplies|Binders     |
|Karl Braun        |United States|5.64  |Office Supplies|Paper       |
|Denny Ordway      |United States|-3.0  |Technology     |Accessories |
|Cassandra Brandow |United States|38.38 |Office Supplies|Binders     |
|Sally Matthias    |United States|5.23  |Office Supplies|Paper       |
|Rick Duston       |United States|5.19  |Furniture      |Furnishings |
|Justin MacKendrick|United States|214.0 |Office Supplies|Storage     |
|Scot Coram        |United States|4.18  |Office Supplies|Storage     |
|Bil O

In [None]:
def test_enriched_order_customer_product_table():
    # Save dataframe as Delta table
    df_enriched_order_customer_product.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("enriched_order_customer_product")

    # Test case: check if the table exists
    table_exists = spark._jsparkSession.catalog().tableExists("enriched_order_customer_product")
    assert table_exists

    # Test case: check if the table has correct schema
    expected_schema = ["Customer_Name", "Country", "Profit", "Category", "Sub_Category"]
    actual_schema = [field.name for field in spark.catalog.listColumns("enriched_order_customer_product")]
    assert actual_schema == expected_schema

    # Test case: check if the table has data
    row_count = spark.sql("SELECT COUNT(*) FROM enriched_order_customer_product").collect()[0][0]
    assert row_count > 0

test_enriched_order_customer_product_table()


#Question:4

In [None]:
df_profit_by_year_pc_psc_customer = spark.sql("""
                  Select 
                  sum(Profit) as Aggregate_Profit
                  ,year(o.Order_Date) as Year
                  ,p.Category
                  ,p.Sub_Category
                  ,c.Customer_ID
                  ,c.Customer_Name
                  from `order` as o
                  join product as p
                  join customer as c
                  on c.Customer_ID = o.Customer_ID
                  and p.Product_ID = o.Product_ID 
                  group by
                  year(o.Order_Date)
                  ,p.Category
                  ,p.Sub_Category
                  ,c.Customer_ID
                  ,c.Customer_Name
                  order by Year desc
                  """)

df_profit_by_year_pc_psc_customer.show(truncate=False)

+-----------------+----+---------------+------------+-----------+--------------------+
|Aggregate_Profit |Year|Category       |Sub_Category|Customer_ID|Customer_Name       |
+-----------------+----+---------------+------------+-----------+--------------------+
|133.0            |2017|Furniture      |Furnishings |MH-17290   |Marc Harrigan       |
|99.9012          |2017|Office Supplies|Appliances  |BM-11140   |Becky Martin        |
|5.1042           |2017|Office Supplies|Envelopes   |CC-12220   |Chris Cortes        |
|2.0416           |2017|Office Supplies|Art         |TA-21385   |Tom Ashbrook        |
|11.998           |2017|Technology     |Accessories |HP-14815   |Harold Pawlan       |
|-32.2192         |2017|Furniture      |Bookcases   |DW-13585   |Dorothy Wardle      |
|107.9892         |2017|Technology     |Accessories |CS-12355   |Christine Sundaresam|
|268.347          |2017|Office Supplies|Paper       |TH-21550   |Tracy Hopkins       |
|170.2833         |2017|Office Supplies|Sto

In [None]:
def test_profit_by_year_pc_psc_customer_table():
    # Save dataframe as Delta table
    df_profit_by_year_pc_psc_customer.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable("profit_by_year_pc_psc_customer")

    # Test case: check if the table exists
    table_exists = spark._jsparkSession.catalog().tableExists("profit_by_year_pc_psc_customer")
    assert table_exists

    # Test case: check if the table has correct schema
    expected_schema = ["Aggregate_Profit", "Year", "Category", "Sub_Category", "Customer_ID", "Customer_Name"]
    actual_schema = [field.name for field in spark.catalog.listColumns("profit_by_year_pc_psc_customer")]
    assert actual_schema == expected_schema

    # Test case: check if the table has data
    row_count = spark.sql("SELECT COUNT(*) FROM profit_by_year_pc_psc_customer").collect()[0][0]
    assert row_count > 0

test_profit_by_year_pc_psc_customer_table()

#Question:5a

In [None]:
def test_profit_by_year():
    # Run SQL Query
    df_profit_by_year = spark.sql(
      """
      Select 
      sum(Aggregate_Profit) as Profit_By_Year
      ,Year
      from profit_by_year_pc_psc_customer
      group by
      Year
      order by Year desc
      """)
    df_profit_by_year.show(truncate=False)

    # Test case: check if the dataframe has correct schema
    expected_schema = ["Profit_By_Year", "Year"]
    assert df_profit_by_year.columns == expected_schema

    # Test case: check if the dataframe has data
    assert df_profit_by_year.select('Year').distinct().count() == spark.sql("Select count(distinct year(Order_Date)) from order").collect()[0][0]

test_profit_by_year()

+------------------+----+
|Profit_By_Year    |Year|
+------------------+----+
|127175.11320000011|2017|
|68161.4049000001  |2016|
|65706.34270000012 |2015|
|40975.45719999994 |2014|
+------------------+----+



#Question:5b

In [None]:


def test_profit_by_year_cat():
    # Run SQL Query
    df_profit_by_year_cat = spark.sql(
      """
      Select 
      sum(Aggregate_Profit) as Profit_By_Year_And_Prd_Cat
      ,Year
      ,Category as Product_Category
      from profit_by_year_pc_psc_customer
      group by
      Year
      ,Product_Category
      order by Year desc
      """)
    df_profit_by_year_cat.show(truncate=False)

    # Test case: check if the dataframe has correct schema
    expected_schema = ["Profit_By_Year_And_Prd_Cat", "Year", "Product_Category"]
    assert df_profit_by_year_cat.columns == expected_schema

    # Test case: check if the dataframe has data
    assert df_profit_by_year_cat.select('Year','Product_Category').distinct().count() == spark.sql("Select count(distinct Year, Category) from profit_by_year_pc_psc_customer").collect()[0][0]

test_profit_by_year_cat()

+--------------------------+----+----------------+
|Profit_By_Year_And_Prd_Cat|Year|Product_Category|
+--------------------------+----+----------------+
|45330.59050000009         |2017|Office Supplies |
|78482.83109999997         |2017|Technology      |
|3361.6915999999983        |2017|Furniture       |
|24437.399599999986        |2016|Technology      |
|7750.212200000004         |2016|Furniture       |
|35973.79310000002         |2016|Office Supplies |
|25490.43370000002         |2015|Office Supplies |
|3392.1574                 |2015|Furniture       |
|36823.75160000004         |2015|Technology      |
|23486.18540000001         |2014|Technology      |
|-5174.653799999998        |2014|Furniture       |
|22663.92559999998         |2014|Office Supplies |
+--------------------------+----+----------------+



#Question:5c

In [None]:
def test_profit_by_customer():
    # Run SQL Query
    df_profit_by_customer = spark.sql(
      """
      Select 
      sum(Aggregate_Profit) as Profit_By_Customer
      ,Customer_ID
      ,Customer_Name
      from profit_by_year_pc_psc_customer
      group by
      Customer_ID
      ,Customer_Name
      order by Customer_ID
      """)
    df_profit_by_customer.show(truncate=False)

    # Test case: check if the dataframe has correct schema
    expected_schema = ["Profit_By_Customer", "Customer_ID", "Customer_Name"]
    assert df_profit_by_customer.columns == expected_schema

    # Test case: check if the dataframe has data
    assert df_profit_by_customer.select('Customer_ID').distinct().count() == spark.sql("Select count(distinct Customer_ID) from profit_by_year_pc_psc_customer").collect()[0][0]

test_profit_by_customer()

+-------------------+-----------+--------------------+
|Profit_By_Customer |Customer_ID|Customer_Name       |
+-------------------+-----------+--------------------+
|-273.40890000000013|AA-10315   |Alex Avila          |
|277.3824           |AA-10375   |Allen Armold        |
|445.96940000000006 |AA-10480   |Andrew Allen        |
|807.8329           |AA-10645   |Anna Andreadi       |
|129.6821           |AB-10015   |Aaron Bergman       |
|2047.8491000000001 |AB-10060   |Adam Bellavance     |
|5483.749           |AB-10105   |Adrian Barton       |
|320.6815           |AB-10150   |Aimee Bixby         |
|215.36410000000004 |AB-10165   |Alan Barnes         |
|264.56749999999994 |AB-10255   |Alejandro Ballentine|
|-275.286           |AB-10600   |Ann Blume           |
|-62.1342           |AC-10420   |Alyssa Crouse       |
|1730.0927000000001 |AC-10450   |Amy Cox             |
|298.61330000000004 |AC-10615   |Ann Chong           |
|-28.700399999999995|AC-10660   |Anna Chung          |
|1869.9293

#Question:5d

In [None]:
def test_profit_by_year_customer():
    # Run SQL Query
    df_profit_by_year_customer = spark.sql(
      """
      Select 
      sum(Aggregate_Profit) as Profit_By_Year_And_Customer
      ,Year
      ,Customer_ID
      ,Customer_Name
      from profit_by_year_pc_psc_customer
      group by
      Year
      ,Customer_ID
      ,Customer_Name
      order by Year desc
      """)
    df_profit_by_year_customer.show(truncate=False)

    # Test case: check if the dataframe has correct schema
    expected_schema = ["Profit_By_Year_And_Customer", "Year", "Customer_ID", "Customer_Name"]
    assert df_profit_by_year_customer.columns == expected_schema

    # Test case: check if the dataframe has data
    assert df_profit_by_year_customer.select('Year', 'Customer_ID').distinct().count() == spark.sql("Select count(distinct Year, Customer_ID) from profit_by_year_pc_psc_customer").collect()[0][0]

test_profit_by_year_customer()

+---------------------------+----+-----------+------------------+
|Profit_By_Year_And_Customer|Year|Customer_ID|Customer_Name     |
+---------------------------+----+-----------+------------------+
|10.4754                    |2017|EM-13810   |null              |
|33.7788                    |2017|CC-12610   |Corey Catlett     |
|221.47639999999998         |2017|MK-18160   |Mike Kennedy      |
|74.228                     |2017|CB-12415   |Christy Brittain  |
|108.05359999999999         |2017|MH-17620   |Matt Hagelstein   |
|397.7298                   |2017|MG-18145   |Mike Gockenbach   |
|1210.7213                  |2017|KF-16285   |Karen Ferguson    |
|36.9728                    |2017|JF-15490   |Jeremy Farry      |
|423.7296                   |2017|EJ-13720   |Ed Jacobs         |
|56.8938                    |2017|EP-13915   |Emily Phan        |
|196.45880000000002         |2017|VP-21730   |Victor Preis      |
|-83.4279                   |2017|SC-20770   |Stewart Carmichael|
|164.91330