### a)Create a new Spark Session with new SparkConfig

In [171]:
sc.stop()
spark.stop()

In [172]:
from pyspark import SparkConf, SparkContext
#setMaster() = Set Spark Content Manager which is local[cpu cores]
config = SparkConf().setMaster('local[2]').setAppName("Assignment7")
sc = SparkContext(conf = config)
sc

### b)Create new instance of Spark SQL session and define new DataFrame using sales_data_sample.csv dataset.

In [173]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SQLSession').getOrCreate()
spark

In [174]:
sales_df = spark.read.csv("file:///home/hadoop/Downloads/sales_data_sample.csv", header=True, inferSchema = True)

In [146]:
sales_df.printSchema()

root
 |-- ORDERNUMBER: integer (nullable = true)
 |-- QUANTITYORDERED: integer (nullable = true)
 |-- PRICEEACH: double (nullable = true)
 |-- ORDERLINENUMBER: integer (nullable = true)
 |-- SALES: double (nullable = true)
 |-- ORDERDATE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- QTR_ID: integer (nullable = true)
 |-- MONTH_ID: integer (nullable = true)
 |-- YEAR_ID: integer (nullable = true)
 |-- PRODUCTLINE: string (nullable = true)
 |-- MSRP: integer (nullable = true)
 |-- PRODUCTCODE: string (nullable = true)
 |-- CUSTOMERNAME: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- ADDRESSLINE1: string (nullable = true)
 |-- ADDRESSLINE2: string (nullable = false)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = false)
 |-- POSTALCODE: string (nullable = false)
 |-- COUNTRY: string (nullable = true)
 |-- TERRITORY: string (nullable = true)
 |-- CONTACTLASTNAME: string (nullable = true)
 |-- CONTACTFIRSTNAME: string (nullable =

### c)Find the shape of DataFrame.

In [10]:
sales_df.count()

2823

In [11]:
len(sales_df.columns)

25

In [181]:
sales_df.show()

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|   STATE|POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+--------+----------+---------+---------+---------------+----------------+--------+
|      10107|             30|     95.7|              2| 2871.0| 2/24/2003

### d) Find the Summary of DataFrame for all numerical data columns.

In [13]:
from pyspark.sql.types import *
num_cols = [field.name for field in sales_df.schema.fields if isinstance(field.dataType, (IntegerType,FloatType, LongType, DoubleType, DecimalType))]
num_cols

['ORDERNUMBER',
 'QUANTITYORDERED',
 'PRICEEACH',
 'ORDERLINENUMBER',
 'SALES',
 'QTR_ID',
 'MONTH_ID',
 'YEAR_ID',
 'MSRP']

In [14]:
sales_df.describe(num_cols).show()

+-------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary|       ORDERNUMBER|  QUANTITYORDERED|         PRICEEACH|  ORDERLINENUMBER|             SALES|            QTR_ID|          MONTH_ID|           YEAR_ID|              MSRP|
+-------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|  count|              2823|             2823|              2823|             2823|              2823|              2823|              2823|              2823|              2823|
|   mean|10258.725115125753|35.09280906836698| 83.65854410201929|6.466170740347148|  3553.88907190932|2.7176762309599716|7.0924548352816155|2003.8150903294368|100.71555083244775|
| stddev|  92.0854775957196| 9.74144273706958|20.174276527840536| 4.22584096469094|1841.8651057401842| 1.

### e)Identify and handle missing or null values in the columns.

In [15]:
from pyspark.sql.functions import *

In [16]:
for columns in sales_df.columns:
    print({columns:sales_df.filter(col(columns).isNull()).count()})

{'ORDERNUMBER': 0}
{'QUANTITYORDERED': 0}
{'PRICEEACH': 0}
{'ORDERLINENUMBER': 0}
{'SALES': 0}
{'ORDERDATE': 0}
{'STATUS': 0}
{'QTR_ID': 0}
{'MONTH_ID': 0}
{'YEAR_ID': 0}
{'PRODUCTLINE': 0}
{'MSRP': 0}
{'PRODUCTCODE': 0}
{'CUSTOMERNAME': 0}
{'PHONE': 0}
{'ADDRESSLINE1': 0}
{'ADDRESSLINE2': 2521}
{'CITY': 0}
{'STATE': 1486}
{'POSTALCODE': 76}
{'COUNTRY': 0}
{'TERRITORY': 0}
{'CONTACTLASTNAME': 0}
{'CONTACTFIRSTNAME': 0}
{'DEALSIZE': 0}


In [175]:
sales_df = sales_df.fillna({'ADDRESSLINE2': "", 'STATE':"", 'POSTALCODE': ""})

In [18]:
for columns in sales_df.columns:
    print({columns:sales_df.filter(col(columns).isNull()).count()})

{'ORDERNUMBER': 0}
{'QUANTITYORDERED': 0}
{'PRICEEACH': 0}
{'ORDERLINENUMBER': 0}
{'SALES': 0}
{'ORDERDATE': 0}
{'STATUS': 0}
{'QTR_ID': 0}
{'MONTH_ID': 0}
{'YEAR_ID': 0}
{'PRODUCTLINE': 0}
{'MSRP': 0}
{'PRODUCTCODE': 0}
{'CUSTOMERNAME': 0}
{'PHONE': 0}
{'ADDRESSLINE1': 0}
{'ADDRESSLINE2': 0}
{'CITY': 0}
{'STATE': 0}
{'POSTALCODE': 0}
{'COUNTRY': 0}
{'TERRITORY': 0}
{'CONTACTLASTNAME': 0}
{'CONTACTFIRSTNAME': 0}
{'DEALSIZE': 0}


### f) Calculate the total revenue generated per country by combining the columns QUANTITYORDERED and PRICEEACH using Spark DataFrame operations?

In [176]:
sales_df1 = sales_df.withColumn("REVENUE",
                                 col("QUANTITYORDERED") * col("PRICEEACH"))

In [169]:
sales_df2 = sales_df1.groupBy("COUNTRY").agg(sum("REVENUE").alias("TOTAL_REVENUE")).orderBy(desc("TOTAL_REVENUE"))

In [170]:
sales_df2.show()

+-----------+------------------+
|    COUNTRY|     TOTAL_REVENUE|
+-----------+------------------+
|        USA|2986425.2099999995|
|      Spain|1021705.9700000002|
|     France| 919257.8499999997|
|  Australia|521598.45999999985|
|         UK|413203.33999999997|
|      Italy| 309402.8699999999|
|    Finland|268714.70000000007|
|     Norway| 246115.8000000001|
|  Singapore| 227985.5000000001|
|     Canada|193504.34000000003|
|    Denmark|         192747.63|
|    Germany|         178689.08|
|     Sweden|174264.10000000006|
|    Austria|172793.05000000002|
|      Japan|153076.68999999994|
|    Belgium|          94528.88|
|Switzerland| 93344.90999999999|
|Philippines| 80291.16999999998|
|    Ireland|          43237.24|
+-----------+------------------+



### g) Determine the top 5 products with the highest total sales revenue using Spark DataFrame?

In [22]:
sales_df3 = sales_df1.orderBy(col('REVENUE').desc())
sales_df3.select(['PRODUCTLINE','PRODUCTCODE','REVENUE']).show(5)

+------------+-----------+-------+
| PRODUCTLINE|PRODUCTCODE|REVENUE|
+------------+-----------+-------+
|Classic Cars|   S12_4675|9048.16|
|Vintage Cars|   S18_1749| 7600.0|
|Classic Cars|   S24_3856| 7600.0|
|      Planes|  S700_2466|7543.75|
|Classic Cars|   S24_2766| 7182.0|
+------------+-----------+-------+
only showing top 5 rows



### h) Find the average order quantity for each product using groupBy and agg operations?

In [166]:
sales_df1.groupBy("PRODUCTCODE").agg(avg("QUANTITYORDERED").alias("AVG_QUANTITY")).show()

+-----------+------------------+
|PRODUCTCODE|      AVG_QUANTITY|
+-----------+------------------+
|   S18_4600| 38.18518518518518|
|   S18_1749| 36.45454545454545|
|   S12_3891| 35.42307692307692|
|   S18_2248| 33.77272727272727|
|  S700_1138| 34.69230769230769|
|   S32_1268|32.333333333333336|
|   S12_1099|             33.52|
|   S18_2795|30.346153846153847|
|   S24_1937|             33.76|
|   S32_3522| 35.44444444444444|
|   S18_1097| 35.67857142857143|
|   S18_1662| 36.15384615384615|
|   S12_1666|34.714285714285715|
|   S24_3969| 33.86363636363637|
|   S24_1578| 35.80769230769231|
|   S24_4048| 32.46153846153846|
|   S18_3320| 34.96153846153846|
|   S24_3816| 33.46153846153846|
|   S18_3136|32.333333333333336|
|   S32_2509|34.107142857142854|
+-----------+------------------+
only showing top 20 rows



### i) Using Spark DataFrame, filter orders where the SALES value exceeds 10,000 and sort theresults by the ORDERDATE column?

In [182]:
sales_df1 = sales_df1.withColumn("ORDERDATE", to_date(col("ORDERDATE"), "M/d/yyyy"))

In [183]:
filtered_df = sales_df1.filter(col("SALES") > 10000).orderBy(col('ORDERDATE'))
filtered_df.select(['ORDERNUMBER','SALES','ORDERDATE']).show()

+-----------+-------+----------+
|ORDERNUMBER|  SALES| ORDERDATE|
+-----------+-------+----------+
|      10127|11279.2|2003-06-03|
|      10150|10993.5|2003-09-19|
|      10247|10606.2|2004-05-05|
|      10304|10172.7|2004-10-11|
|      10312|11623.7|2004-10-21|
|      10322|12536.5|2004-11-04|
|      10333|11336.7|2004-11-18|
|      10339|10758.0|2004-11-23|
|      10375|10039.6|2005-02-03|
|      10388|10066.6|2005-03-03|
|      10403|11886.6|2005-04-08|
|      10405|11739.7|2005-04-14|
|      10406|10468.9|2005-04-15|
|      10407|14082.8|2005-04-22|
|      10412|11887.8|2005-05-03|
|      10424|12001.0|2005-05-31|
+-----------+-------+----------+



### j) Filter out rows where the STATUS is &#39;Cancelled&#39; and calculate the total sales from the remaining orders?

In [26]:
cancelled_df = sales_df1.filter(col('STATUS') != 'Cancelled')
total_sales = cancelled_df.agg(sum(col('SALES')).alias('total_sales'))
total_sales.show()

+-----------------+
|      total_sales|
+-----------------+
|9838141.370000018|
+-----------------+



### k) Use Spark Data Frame transformations to derive the yearly sales for each customer (CUSTOMERNAME) based on the ORDERDATE column?

In [41]:
sales_df2 = sales_df1.withColumn('ORDER_YEAR', year(col('ORDERDATE')))

In [49]:
yearly_sales = (sales_df2.groupBy("CUSTOMERNAME","ORDER_YEAR")
                .agg(sum(col('SALES')).alias('yearly_sales')).orderBy("CUSTOMERNAME","ORDER_YEAR"))

In [51]:
yearly_sales.show(10)

+--------------------+----------+------------------+
|        CUSTOMERNAME|ORDER_YEAR|      yearly_sales|
+--------------------+----------+------------------+
|      AV Stores, Co.|      2003| 51017.91999999999|
|      AV Stores, Co.|      2004|         106789.89|
|        Alpha Cognac|      2003| 55349.31999999999|
|        Alpha Cognac|      2005|15139.119999999999|
|  Amica Models & Co.|      2004| 94117.26000000002|
|Anna's Decoration...|      2003| 88983.70999999999|
|Anna's Decoration...|      2005|          65012.42|
|   Atelier graphique|      2003|           16560.3|
|   Atelier graphique|      2004|           7619.66|
|Australian Collec...|      2003|          37878.55|
+--------------------+----------+------------------+
only showing top 10 rows



### l) Add a new column to the DataFrame that categorizes orders as"High", "Medium" or "Low" sales based on the SALES value?

In [59]:
sales_df4 = sales_df1.withColumn(
    'SALES_CATEGORY',
    when(col('SALES') > 8000, 'High')
    .when(col('SALES') > 3000, 'Medium')
    .otherwise('Low')
)
sales_df4.select(['SALES','CUSTOMERNAME','SALES_CATEGORY']).show(30)

+-------+--------------------+--------------+
|  SALES|        CUSTOMERNAME|SALES_CATEGORY|
+-------+--------------------+--------------+
| 2871.0|   Land of Toys Inc.|           Low|
| 2765.9|  Reims Collectables|           Low|
|3884.34|     Lyon Souveniers|        Medium|
| 3746.7|   Toys4GrownUps.com|        Medium|
|5205.27|Corporate Gift Id...|        Medium|
|3479.76|Technics Stores Inc.|        Medium|
|2497.77|Daedalus Designs ...|           Low|
|5512.32|        Herkku Gifts|        Medium|
|2168.54|     Mini Wheels Co.|           Low|
|4708.44|    Auto Canal Petit|        Medium|
|3965.66|Australian Collec...|        Medium|
|2333.12|     Vitachrome Inc.|           Low|
|3188.64|Tekni Collectable...|        Medium|
|3676.76|     Gift Depot Inc.|        Medium|
|4177.35|   La Rochelle Gifts|        Medium|
|4099.68|Marta's Replicas Co.|        Medium|
|2597.39|Toys of Finland, Co.|           Low|
|4394.38|  Baane Mini Imports|        Medium|
|4358.04|Diecast Classics ...|    

### m) Assume, If you have another DataFrame with customer demographic data, how would you perform a join to compute the total sales per demographic group?

In [147]:
data = [
    ("Land of Toys Inc.", "Retail"),
    ("Reims Collectables", "Retail"),
    ("Lyon Souveniers", "Retail"),
    ("Toys4GrownUps.com", "Wholesale"),
    ("Corporate Gift Ideas", "Wholesale")]
columns = ["CUSTOMERNAME", "DEMOGRAPHIC_GROUP"]
demographic_df = spark.createDataFrame(data,columns)
demographic_df.show()

+--------------------+-----------------+
|        CUSTOMERNAME|DEMOGRAPHIC_GROUP|
+--------------------+-----------------+
|   Land of Toys Inc.|           Retail|
|  Reims Collectables|           Retail|
|     Lyon Souveniers|           Retail|
|   Toys4GrownUps.com|        Wholesale|
|Corporate Gift Ideas|        Wholesale|
+--------------------+-----------------+



In [118]:
join_df = sales_df1.join(demographic_df, on='CUSTOMERNAME', how='inner')
sales_demographic = join_df.groupBy('DEMOGRAPHIC_GROUP').agg(sum('SALES').alias('TOTAL_SALES'))
sales_demographic.show()

+-----------------+------------------+
|DEMOGRAPHIC_GROUP|       TOTAL_SALES|
+-----------------+------------------+
|        Wholesale|104561.95999999998|
|           Retail|         377682.72|
+-----------------+------------------+



### n) Can you implement a cumulative distribution function (CDF) over the SALES value for each CUSTOMERNAME? What insights can you gather from analyzing the CDF distribution for each customer?

In [63]:
from pyspark.sql import Window

In [68]:
window_df = Window.partitionBy('CUSTOMERNAME').orderBy('SALES')
ranked_df = sales_df1.withColumn('RANK', row_number().over(window_df))
total_sales = sales_df1.groupBy('CUSTOMERNAME').agg(count('SALES').alias('TOTAL_COUNT'))
sales_new = ranked_df.join(total_sales, on='CUSTOMERNAME')
sales_cdf = sales_new.withColumn('CDF', col('RANK') / col('TOTAL_COUNT'))

In [73]:
sales_cdf.select('CUSTOMERNAME', 'SALES', 'RANK','TOTAL_COUNT','CDF').show(60)

+--------------------+-------+----+-----------+--------------------+
|        CUSTOMERNAME|  SALES|RANK|TOTAL_COUNT|                 CDF|
+--------------------+-------+----+-----------+--------------------+
| Suominen Souveniers| 891.03|   1|         30| 0.03333333333333333|
| Suominen Souveniers| 1086.6|   2|         30| 0.06666666666666667|
| Suominen Souveniers|1103.76|   3|         30|                 0.1|
| Suominen Souveniers|1629.04|   4|         30| 0.13333333333333333|
| Suominen Souveniers| 1988.4|   5|         30| 0.16666666666666666|
| Suominen Souveniers|2140.11|   6|         30|                 0.2|
| Suominen Souveniers|2447.76|   7|         30| 0.23333333333333334|
| Suominen Souveniers|2632.89|   8|         30| 0.26666666666666666|
| Suominen Souveniers| 2773.8|   9|         30|                 0.3|
| Suominen Souveniers|2775.08|  10|         30|  0.3333333333333333|
| Suominen Souveniers|2817.87|  11|         30| 0.36666666666666664|
| Suominen Souveniers|2851.84|  12

Insight : Higher sales values correspond to higher CDF values. This shows a non-decreasing trend as sales increase.

### o) Write spark dataframe code to rank products by total revenue within each country (COUNTRY)?

In [184]:
revenue_df = sales_df1.groupBy('COUNTRY', 'PRODUCTCODE','PRODUCTLINE') \
    .agg(sum('SALES').alias('TOTAL_REVENUE'))
new_df = Window.partitionBy('COUNTRY').orderBy(col('TOTAL_REVENUE').desc())
ranked_df = revenue_df.withColumn('RANK', rank().over(new_df))
ranked_df.show(40)

+-------+-----------+----------------+------------------+----+
|COUNTRY|PRODUCTCODE|     PRODUCTLINE|     TOTAL_REVENUE|RANK|
+-------+-----------+----------------+------------------+----+
| Sweden|   S10_1949|    Classic Cars|           14345.3|   1|
| Sweden|   S18_4600|Trucks and Buses|          12052.11|   2|
| Sweden|   S24_2300|Trucks and Buses|10926.119999999999|   3|
| Sweden|   S12_1099|    Classic Cars|           9451.15|   4|
| Sweden|   S18_2949|    Vintage Cars|           8253.68|   5|
| Sweden|   S24_2011|           Ships|           8145.45|   6|
| Sweden|   S10_4962|    Classic Cars|           7044.02|   7|
| Sweden|   S18_2625|     Motorcycles|            6981.0|   8|
| Sweden|   S12_3380|    Classic Cars|            6659.8|   9|
| Sweden|   S12_1666|Trucks and Buses|            6387.8|  10|
| Sweden|   S10_4757|    Classic Cars|           5924.16|  11|
| Sweden|   S18_2319|Trucks and Buses|           5814.86|  12|
| Sweden|   S18_1662|          Planes|           5763.7

Insight : In Sweden , Classic Cars has the highest sales revenue.

### p) Calculate a running total of SALES for each customer and show the top 5 customers by this cumulative total?

In [157]:
cumulative = Window.orderBy('TOTAL_SALES').rowsBetween(Window.unboundedPreceding,Window.currentRow)
total_df = (sales_df1.select('CUSTOMERNAME','SALES').groupBy('CUSTOMERNAME')
         .agg(round(sum('SALES'),2).alias('TOTAL_SALES')))
total_df = total_df.withColumn('CUMULATIVE_SUM',round(sum('TOTAL_SALES').over(cumulative),2))
total_df.orderBy('CUMULATIVE_SUM',ascending=False).limit(5).show()

+--------------------+-----------+--------------+
|        CUSTOMERNAME|TOTAL_SALES|CUMULATIVE_SUM|
+--------------------+-----------+--------------+
|Euro Shopping Cha...|  912294.11| 1.003262885E7|
|Mini Gifts Distri...|  654858.06|    9120334.74|
|Australian Collec...|  200995.41|    8465476.68|
|  Muscle Machine Inc|  197736.94|    8264481.27|
|   La Rochelle Gifts|   180124.9|    8066744.33|
+--------------------+-----------+--------------+



### q) Find and handle Invalid and Outliers values in entire DataFrame. [Check for only continuous dataset].

In [185]:
def calculate_iqr(sales_df1, column_name):
    q1 = sales_df1.approxQuantile(column_name, [0.25], 0.001)[0]
    q3 = sales_df1.approxQuantile(column_name, [0.75], 0.001)[0]
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    new_sales = sales_df1.filter((col(column_name) >= lower_bound) & 
                                 (col(column_name) <= upper_bound))
    return new_sales.withColumn(column_name,when(col(column_name) < lower_bound, lower_bound)
        .when(col(column_name) > upper_bound, upper_bound).otherwise(col(column_name)))

handled_df = calculate_iqr(sales_df1, "SALES")
handled_df.show(4)

+-----------+---------------+---------+---------------+-------+----------+-------+------+--------+-------+-----------+----+-----------+------------------+----------------+--------------------+------------+--------+-----+----------+-------+---------+---------------+----------------+--------+------------------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES| ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|      CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|    CITY|STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|           REVENUE|
+-----------+---------------+---------+---------------+-------+----------+-------+------+--------+-------+-----------+----+-----------+------------------+----------------+--------------------+------------+--------+-----+----------+-------+---------+---------------+----------------+--------+------------------+
|      10107|             30|     95.7|              2| 2871.0|2003

### r) How would you cache a DataFrame containing sales data from the top 10 countries by sales to avoid recomputation in subsequent transformations? What persistence level (e.g. MEMORY_ONLY, MEMORY_AND_DISK) would you choose and why?

In [158]:
country_df = sales_df1.groupBy('COUNTRY') \
    .agg(sum('SALES').alias('TOTAL_SALES')) \
    .orderBy(desc('TOTAL_SALES')).limit(10)

top_countries = [row['COUNTRY'] for row in country_df.collect()]
top_sales = sales_df1.filter(col('COUNTRY').isin(top_countries))

In [87]:
top_sales.cache()

DataFrame[ORDERNUMBER: int, QUANTITYORDERED: int, PRICEEACH: double, ORDERLINENUMBER: int, SALES: double, ORDERDATE: date, STATUS: string, QTR_ID: int, MONTH_ID: int, YEAR_ID: int, PRODUCTLINE: string, MSRP: int, PRODUCTCODE: string, CUSTOMERNAME: string, PHONE: string, ADDRESSLINE1: string, ADDRESSLINE2: string, CITY: string, STATE: string, POSTALCODE: string, COUNTRY: string, TERRITORY: string, CONTACTLASTNAME: string, CONTACTFIRSTNAME: string, DEALSIZE: string, REVENUE: double]

In [90]:
top_sales.persist()

DataFrame[ORDERNUMBER: int, QUANTITYORDERED: int, PRICEEACH: double, ORDERLINENUMBER: int, SALES: double, ORDERDATE: date, STATUS: string, QTR_ID: int, MONTH_ID: int, YEAR_ID: int, PRODUCTLINE: string, MSRP: int, PRODUCTCODE: string, CUSTOMERNAME: string, PHONE: string, ADDRESSLINE1: string, ADDRESSLINE2: string, CITY: string, STATE: string, POSTALCODE: string, COUNTRY: string, TERRITORY: string, CONTACTLASTNAME: string, CONTACTFIRSTNAME: string, DEALSIZE: string, REVENUE: double]

Choosed memory only persistence level because the dataset can be fitted in the memory.

### s) How would you pivot the data to show PRODUCTLINE as columns and the total SALES for each ORDERDATE as the values? What are the implications of pivoting large datasets in Spark?

In [89]:
pivot_df = sales_df1.groupBy('ORDERDATE').pivot('PRODUCTLINE').agg(sum('SALES'))
pivot_df.show(10)

+----------+------------------+------------------+------------------+-------+--------+----------------+------------------+
| ORDERDATE|      Classic Cars|       Motorcycles|            Planes|  Ships|  Trains|Trucks and Buses|      Vintage Cars|
+----------+------------------+------------------+------------------+-------+--------+----------------+------------------+
|2003-11-11|           7557.04|41317.240000000005|              null|   null|    null|            null|              null|
|2003-07-16|              null|              null|              null|   null|    null|            null|28397.260000000002|
|2004-11-01|          14094.32|              null|          24409.53|9756.42|    null|        15508.35|22112.140000000003|
|2004-11-05|          40174.03|              null|              null|   null|11310.36|         36985.8|           17770.5|
|2003-10-11|          24159.14|              null|              null|   null|    null|            null|              null|
|2004-05-04|    

* Pivoting large datasets can lead to high memory usage. 
* Pivoted DataFrames with a high number of columns can become difficult to manage and analyze. 

### t) How would you calculate the percentage growth of total sales month over month for each PRODUCTLINE using Spark DataFrame?

In [159]:
sales_df2 = sales_df1.withColumn('ORDER_MONTH', month(col('ORDERDATE')))
sales_df2 = sales_df2.withColumn('ORDER_YEAR', year(col('ORDERDATE')))

In [160]:
month_sales = sales_df2.groupBy('ORDER_YEAR', 'ORDER_MONTH', 'PRODUCTLINE') \
    .agg(sum('SALES').alias('TOTAL_SALES')).orderBy(desc('TOTAL_SALES'))

In [161]:
new1 = Window.partitionBy('PRODUCTLINE').orderBy('ORDER_YEAR', 'ORDER_MONTH')
month_sales = month_sales.withColumn('PREV_TOTAL_SALES', lag('TOTAL_SALES').over(new1))
month_sales = month_sales.withColumn('PERCENTAGE_GROWTH',
    (col('TOTAL_SALES') - col('PREV_TOTAL_SALES')) / col('PREV_TOTAL_SALES') * 100)
month_sales.show()

+----------+-----------+-----------+------------------+------------------+-------------------+
|ORDER_YEAR|ORDER_MONTH|PRODUCTLINE|       TOTAL_SALES|  PREV_TOTAL_SALES|  PERCENTAGE_GROWTH|
+----------+-----------+-----------+------------------+------------------+-------------------+
|      2003|          2|Motorcycles|25783.760000000002|              null|               null|
|      2003|          3|Motorcycles|          12639.15|25783.760000000002| -50.98019063162239|
|      2003|          4|Motorcycles|23475.590000000004|          12639.15|   85.7370946622202|
|      2003|          5|Motorcycles|          22097.32|23475.590000000004|-5.8710771486467594|
|      2003|          6|Motorcycles|           2642.01|          22097.32| -88.04375372217082|
|      2003|          7|Motorcycles| 37924.23000000001|           2642.01|  1335.430978686682|
|      2003|          8|Motorcycles|44164.909999999996| 37924.23000000001| 16.455653812878953|
|      2003|          9|Motorcycles|           315

### u) How can you rebalance the data by portioning based on the COUNTRY column to ensure that large data partitions are avoided?

In [163]:
balanced_df = sales_df1.repartition("COUNTRY")

Insight : Here, data will be evenly distributed. So large data partitions can be thereby avoided.

### v) Suppose you have a smaller lookup table with customer details. How would you perform a broadcast join with the large sales_data_sample dataset to improve join performance? What are the key considerations when using broadcast joins?

In [139]:
customer_df = spark.read.csv("file:///home/hadoop/Downloads/Customer.csv", header = True, inferSchema = True)

In [140]:
broadcast_df = broadcast(customer_df)
new2 = sales_df.join(broadcast_df, sales_df1['CUSTOMERNAME'] == broadcast_df['CUSTOMERNAME'])
new2.show(10)

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+------------+----+-----------+--------------------+--------------+--------------------+------------+-----------+-------------+----------+-------+---------+---------------+----------------+--------+--------------------+---------+--------------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID| PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|         PHONE|        ADDRESSLINE1|ADDRESSLINE2|       CITY|        STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|        CUSTOMERNAME|TERRITORY|         PHONE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+------------+----+-----------+--------------------+--------------+--------------------+------------+-----------+-------------+----------+-------+---------+---------------+--------

Key Considerations:
* Ensure that the columnname used for joining both dataframes exists in them.
* Ensure the lookup table is small enough for the the main dataset to avoid overhead.

### w) Create a UDF that categorizes the sales values (SALES) into custom buckets like “Low”, “Medium”, “High”. Apply this UDF to the DataFrame and calculate the count of orders in each category per COUNTRY.

In [164]:
def salesUdf(sales):
    if sales < 3000:
        return "Low"
    elif sales < 4000:
        return "Medium"
    else:
        return "High"
    
sales_udf = udf(salesUdf)
sales_df_new = sales_df1.withColumn("SALES_CATEGORY",sales_udf(col("SALES")))
sales_df_new.groupBy("COUNTRY", "SALES_CATEGORY").count().orderBy("COUNTRY",desc("count")).show()

+---------+--------------+-----+
|  COUNTRY|SALES_CATEGORY|count|
+---------+--------------+-----+
|Australia|           Low|   92|
|Australia|          High|   56|
|Australia|        Medium|   37|
|  Austria|           Low|   22|
|  Austria|          High|   20|
|  Austria|        Medium|   13|
|  Belgium|           Low|   18|
|  Belgium|          High|   10|
|  Belgium|        Medium|    5|
|   Canada|           Low|   36|
|   Canada|          High|   17|
|   Canada|        Medium|   17|
|  Denmark|           Low|   26|
|  Denmark|          High|   22|
|  Denmark|        Medium|   15|
|  Finland|           Low|   41|
|  Finland|          High|   31|
|  Finland|        Medium|   20|
|   France|           Low|  144|
|   France|          High|   98|
+---------+--------------+-----+
only showing top 20 rows



### x) Create a Python UDF to calculate discounts for specific product lines. For example, give a 10% discount for Classic Cars and 5% for Motorcycles. Apply this UDF to derive new discounted sales values.

In [141]:
def discount(productline, sales):
    if productline == 'Classic Cars':
        return sales * 0.90 
    elif productline == 'Motorcycles':
        return sales * 0.95
    else:
        return sales
discount_udf = udf(discount)

In [144]:
discount_df = sales_df1.withColumn('DISCOUNTED_SALES', discount_udf(col('PRODUCTLINE'), col('SALES')))
discount_df.select("PRODUCTLINE","PRODUCTCODE","SALES","DISCOUNTED_SALES").show()

+-----------+-----------+-------+------------------+
|PRODUCTLINE|PRODUCTCODE|  SALES|  DISCOUNTED_SALES|
+-----------+-----------+-------+------------------+
|Motorcycles|   S10_1678| 2871.0|           2727.45|
|Motorcycles|   S10_1678| 2765.9|          2627.605|
|Motorcycles|   S10_1678|3884.34|          3690.123|
|Motorcycles|   S10_1678| 3746.7|          3559.365|
|Motorcycles|   S10_1678|5205.27|         4945.0065|
|Motorcycles|   S10_1678|3479.76|          3305.772|
|Motorcycles|   S10_1678|2497.77|         2372.8815|
|Motorcycles|   S10_1678|5512.32|          5236.704|
|Motorcycles|   S10_1678|2168.54|          2060.113|
|Motorcycles|   S10_1678|4708.44| 4473.017999999999|
|Motorcycles|   S10_1678|3965.66|3767.3769999999995|
|Motorcycles|   S10_1678|2333.12|          2216.464|
|Motorcycles|   S10_1678|3188.64|3029.2079999999996|
|Motorcycles|   S10_1678|3676.76|          3492.922|
|Motorcycles|   S10_1678|4177.35|         3968.4825|
|Motorcycles|   S10_1678|4099.68|          389

### y) How would you set up an incremental loading mechanism for orders placed daily based on the ORDERDATE column? How can Spark checkpointing can be used with incremental load to ensure no data loss occurs during failures?

### z) How do you implement a cumulative distribution function (CDF) over the SALES value for each CUSTOMERNAME? What insights can you gather from analyzing the CDF distribution for each customer?

In [188]:
window_df = Window.partitionBy('CUSTOMERNAME').orderBy('SALES')
ranked_df = sales_df1.withColumn('RANK', row_number().over(window_df))
total_sales = sales_df1.groupBy('CUSTOMERNAME').agg(count('SALES').alias('TOTAL_COUNT'))
sales_new = ranked_df.join(total_sales, on='CUSTOMERNAME')
sales_cdf = sales_new.withColumn('CDF', col('RANK') / col('TOTAL_COUNT'))
sales_cdf.select('CUSTOMERNAME', 'SALES', 'CDF').show(60)

+--------------------+-------+--------------------+
|        CUSTOMERNAME|  SALES|                 CDF|
+--------------------+-------+--------------------+
| Suominen Souveniers| 891.03| 0.03333333333333333|
| Suominen Souveniers| 1086.6| 0.06666666666666667|
| Suominen Souveniers|1103.76|                 0.1|
| Suominen Souveniers|1629.04| 0.13333333333333333|
| Suominen Souveniers| 1988.4| 0.16666666666666666|
| Suominen Souveniers|2140.11|                 0.2|
| Suominen Souveniers|2447.76| 0.23333333333333334|
| Suominen Souveniers|2632.89| 0.26666666666666666|
| Suominen Souveniers| 2773.8|                 0.3|
| Suominen Souveniers|2775.08|  0.3333333333333333|
| Suominen Souveniers|2817.87| 0.36666666666666664|
| Suominen Souveniers|2851.84|                 0.4|
| Suominen Souveniers|2931.98| 0.43333333333333335|
| Suominen Souveniers|3128.65|  0.4666666666666667|
| Suominen Souveniers|3288.82|                 0.5|
| Suominen Souveniers|3595.62|  0.5333333333333333|
| Suominen S

Insight - It shows a trend of increasing sales amounts. There is a steady increase in both sales and CDF values.