In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession\
                    .builder\
                    .master("spark://spark-master:7077")\
                    .appName("Day_5_Assignment")\
                    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/31 04:38:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/31 04:38:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [5]:
orders_schema_struct = StructType(
                                    [
                                        StructField("order_id",IntegerType()),
                                        StructField("customer_id",IntegerType()),
                                        StructField("product_id",IntegerType()),
                                        StructField("price",FloatType()),
                                        StructField("order_date",DateType()),
                                        StructField("order_status",StringType()),
                                        StructField("state",StringType()),
                                        StructField("quantity",IntegerType()),
                                    ]
                                )

df_orders = spark.read.csv('/data/orders_50mb.csv',schema=orders_schema_struct,header=True)

df_orders.show(5)

26/01/31 04:38:36 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
26/01/31 04:38:51 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
26/01/31 04:39:06 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
26/01/31 04:39:21 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
26/01/31 04:39:36 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
26/01/31 04:39:51 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure th

+--------+-----------+----------+------+----------+------------+-------+--------+
|order_id|customer_id|product_id| price|order_date|order_status|  state|quantity|
+--------+-----------+----------+------+----------+------------+-------+--------+
|       1|     192520|        70|152.33|2007-10-10|      PLACED| Odisha|       2|
|       2|     835421|        34|117.42|2008-12-02|     SHIPPED| Kerala|       3|
|       3|     159165|        13|128.85|2010-04-22|   DELIVERED|Gujarat|       3|
|       4|     403890|        25|195.71|2007-05-30|   CANCELLED|  Bihar|       1|
|       5|     273746|        41|125.08|2003-12-13|     SHIPPED| Odisha|       3|
+--------+-----------+----------+------+----------+------------+-------+--------+
only showing top 5 rows



In [6]:
# create tempview of this df_orders
df_orders.createOrReplaceTempView('sql_orders')

spark.sql(
    '''
    select * from sql_orders limit 5
    '''
).show()

+--------+-----------+----------+------+----------+------------+-------+--------+
|order_id|customer_id|product_id| price|order_date|order_status|  state|quantity|
+--------+-----------+----------+------+----------+------------+-------+--------+
|       1|     192520|        70|152.33|2007-10-10|      PLACED| Odisha|       2|
|       2|     835421|        34|117.42|2008-12-02|     SHIPPED| Kerala|       3|
|       3|     159165|        13|128.85|2010-04-22|   DELIVERED|Gujarat|       3|
|       4|     403890|        25|195.71|2007-05-30|   CANCELLED|  Bihar|       1|
|       5|     273746|        41|125.08|2003-12-13|     SHIPPED| Odisha|       3|
+--------+-----------+----------+------+----------+------------+-------+--------+



In [7]:
df_orders.where(lower(col('order_status'))!='cancelled')\
                .groupBy(col('state'),year(col('order_date')).alias('year'))\
                      .agg(sum('quantity').alias('total_quantity'),sum(col('price')*col('quantity')).alias('revenue'))\
                      .filter(col('revenue')>550000)\
                       .sort(col('revenue').desc())\
                          .show()



+-----------+----+--------------+-----------------+
|      state|year|total_quantity|          revenue|
+-----------+----+--------------+-----------------+
|      Bihar|2016|          3770|568748.1598434448|
|Maharashtra|2006|          3763|564980.0401229858|
| Tamil Nadu|2019|          3716|559179.5512008667|
|      Delhi|2020|          3702|558023.8393783569|
|Uttarakhand|2020|          3735|556238.4301223755|
|      Assam|2006|          3721|555672.2194595337|
|     Odisha|2018|          3686|551947.6004257202|
|Maharashtra|2015|          3654|551441.4298934937|
|      Bihar|2020|          3653|551408.4500656128|
+-----------+----+--------------+-----------------+



                                                                                

In [11]:
spark.sql(
        '''
            select state,year(order_date) as year,sum(quantity) as total_quantity,sum(price*quantity) as revenue
            from sql_orders
            where order_status!='CANCELLED'
            group by state,year(order_date)
            having sum(price*quantity)>550000
            order by revenue desc
        '''
        ).show()



+-----------+----+--------------+-----------------+
|      state|year|total_quantity|          revenue|
+-----------+----+--------------+-----------------+
|      Bihar|2016|          3770|568748.1598434448|
|Maharashtra|2006|          3763|564980.0401229858|
| Tamil Nadu|2019|          3716|559179.5512008667|
|      Delhi|2020|          3702|558023.8393783569|
|Uttarakhand|2020|          3735|556238.4301223755|
|      Assam|2006|          3721|555672.2194595337|
|     Odisha|2018|          3686|551947.6004257202|
|Maharashtra|2015|          3654|551441.4298934937|
|      Bihar|2020|          3653|551408.4500656128|
+-----------+----+--------------+-----------------+



                                                                                

In [12]:
df_orders.withColumn('order_value_category',when((col('price')*col('quantity'))>400,'HIGH_VALUE')\
                .when((col('price')*col('quantity')>250) &((col('price')*col('quantity'))<400),'MEDIUM_VALUE')\
                         .otherwise('LOW_VALUE') )\
                            .groupBy('order_value_category').agg(count('*').alias('category_counts'))\
                             .show()

                                                                                

+--------------------+---------------+
|order_value_category|category_counts|
+--------------------+---------------+
|          HIGH_VALUE|         222239|
|        MEDIUM_VALUE|         361193|
|           LOW_VALUE|         416568|
+--------------------+---------------+



In [10]:
# spark.sql

# IMP

In [15]:
df_orders.groupBy('state')\
            .agg(count('*').alias('total_orders')\
            ,sum(when(lower(col('order_status'))=='delivered' ,1).otherwise(0)).alias('total_delivered')
                ).withColumn('delivery_percentage',(col('total_delivered')/col('total_orders'))*100)\
                .show()

                                                                                

+----------------+------------+---------------+-------------------+
|           state|total_orders|total_delivered|delivery_percentage|
+----------------+------------+---------------+-------------------+
|       Karnataka|       45555|          11455| 25.145428602787838|
|          Odisha|       45719|          11459|  25.06397777729172|
|          Kerala|       45691|          11490|  25.14718434702677|
|      Tamil Nadu|       45638|          11386| 24.948507822428677|
|    Chhattisgarh|       45562|          11395|  25.00987665159563|
|  Andhra Pradesh|       45650|          11410| 24.994523548740418|
|  Madhya Pradesh|       45254|          11338| 25.054138860653204|
|          Punjab|       45479|          11333| 24.919193473911037|
|             Goa|       45090|          11184| 24.803725881570195|
|Himachal Pradesh|       45686|          11434|  25.02736067942039|
|         Haryana|       45500|          11306|  24.84835164835165|
|       Jharkhand|       45490|          11292| 

In [16]:
#spark.sql

In [9]:
df= df_orders.groupBy('customer_id',year('order_date').alias('year'))\
           .agg(sum(col('price')*col('quantity')).alias('yearly_spending'))
df.show()

df_final = df.groupBy('customer_id')\
                    .agg(countDistinct('year').alias('distinct_order_years'),sum('yearly_spending').alias('total_spending'))\
                    .filter(col('distinct_order_years')>5)\
                    .sort(col('customer_id').desc(),col('distinct_order_years').desc())

df_final.show()

                                                                                

+-----------+----+------------------+
|customer_id|year|   yearly_spending|
+-----------+----+------------------+
|     388926|2000|127.56999969482422|
|      73005|2016| 153.1199951171875|
|     785194|2012|  256.739990234375|
|     805598|2017| 513.5399780273438|
|     291673|2016|            344.25|
|     171887|2005|160.08999633789062|
|     602684|2017|199.58999633789062|
|     102973|2019| 324.3599853515625|
|     240379|2012|             179.0|
|     553700|2009| 282.4599914550781|
|     442332|2020| 555.9000244140625|
|     681505|2006|111.55000305175781|
|      97737|2005| 241.9199981689453|
|     750648|2002|187.25999450683594|
|     523868|2006| 398.7799987792969|
|     310804|2012|456.77996826171875|
|     480498|2002| 513.2099914550781|
|     526920|2006| 525.8399658203125|
|      68965|2001|122.69999694824219|
|     269098|2013|420.05999755859375|
+-----------+----+------------------+
only showing top 20 rows





+-----------+--------------------+------------------+
|customer_id|distinct_order_years|    total_spending|
+-----------+--------------------+------------------+
|     999502|                   6|2031.1599884033203|
|     998725|                   6| 2014.740005493164|
|     997612|                   6|2483.6200103759766|
|     991972|                   7|2233.5899963378906|
|     991815|                   6|1869.8900299072266|
|     991339|                   6|2374.6100311279297|
|     990711|                   7|2498.4300537109375|
|     984560|                   6| 2066.500015258789|
|     977196|                   6|2088.9700241088867|
|     974439|                   7| 1318.650016784668|
|     974372|                   6|1762.4599914550781|
|     973954|                   6|1475.9499816894531|
|     973101|                   6|1608.0500259399414|
|     972822|                   7|2224.1200408935547|
|     969360|                   6|1387.9099960327148|
|     965972|               

                                                                                

In [8]:
df_orders.groupBy('customer_id',year('order_date').alias('year'))\
           .agg(sum(col('price')*col('quantity')).alias('yearly_spending'))\
           .groupBy('customer_id')\
                    .agg(countDistinct('year').alias('distinct_order_years'),sum('yearly_spending').alias('total_spending'))\
                    .filter(col('distinct_order_years')>5)\
                    .sort(col('customer_id').desc(),col('distinct_order_years').desc())\
                    .show()



+-----------+--------------------+------------------+
|customer_id|distinct_order_years|    total_spending|
+-----------+--------------------+------------------+
|     999502|                   6|2031.1599884033203|
|     998725|                   6| 2014.740005493164|
|     997612|                   6|2483.6200103759766|
|     991972|                   7|2233.5899963378906|
|     991815|                   6|1869.8900299072266|
|     991339|                   6|2374.6100311279297|
|     990711|                   7|2498.4300537109375|
|     984560|                   6| 2066.500015258789|
|     977196|                   6|2088.9700241088867|
|     974439|                   7| 1318.650016784668|
|     974372|                   6|1762.4599914550781|
|     973954|                   6|1475.9499816894531|
|     973101|                   6|1608.0500259399414|
|     972822|                   7|2224.1200408935547|
|     969360|                   6|1387.9099960327148|
|     965972|               

                                                                                

In [None]:
#spark.sql

In [9]:
df_orders.where(year('order_date')>2005)\
            .groupBy('order_status')\
                .agg(count('*').alias('orders_per_status'),sum(col('price')*col('quantity')).alias('order_value_per_status'))\
                .withColumn('average_order_value',col('order_value_per_status')/col('orders_per_status'))\
                    .show()

                                                                                

+------------+-----------------+----------------------+-------------------+
|order_status|orders_per_status|order_value_per_status|average_order_value|
+------------+-----------------+----------------------+-------------------+
|    RETURNED|            76208|   2.285751473802185E7|  299.9358956805303|
|   DELIVERED|           189682|   5.687318122923279E7|  299.8343608209149|
|   CANCELLED|            38072|  1.1429303846618652E7| 300.20234940687783|
|      PLACED|           265670|   7.953784752929688E7| 299.38588297247287|
|     SHIPPED|           189554|  5.6928588676849365E7| 300.32913405599123|
+------------+-----------------+----------------------+-------------------+



In [None]:
#spark.sql

In [20]:
df_orders.withColumn('state',lower(col('state')))\
            .withColumn('state',trim(col('state')))\
                .groupBy('state').agg(sum('quantity').alias('total_quantity'))\
                .orderBy(col('total_quantity').desc())\
                        .take(5)

                                                                                

[Row(state='kerala', total_quantity=91678),
 Row(state='himachal pradesh', total_quantity=91537),
 Row(state='odisha', total_quantity=91485),
 Row(state='bihar', total_quantity=91474),
 Row(state='andhra pradesh', total_quantity=91258)]

In [None]:
#spark.sql

+----------------+-----+----------------------------+
|           state|total|count((bulk_order_flag = Y))|
+----------------+-----+----------------------------+
|       Karnataka|45555|                       45555|
|          Odisha|45719|                       45719|
|          Kerala|45691|                       45691|
|      Tamil Nadu|45638|                       45638|
|    Chhattisgarh|45562|                       45562|
|  Andhra Pradesh|45650|                       45650|
|  Madhya Pradesh|45254|                       45254|
|          Punjab|45479|                       45479|
|             Goa|45090|                       45090|
|Himachal Pradesh|45686|                       45686|
|         Haryana|45500|                       45500|
|       Jharkhand|45490|                       45490|
|         Gujarat|45340|                       45340|
|           Delhi|45404|                       45404|
|       Rajasthan|45119|                       45119|
|           Assam|45574|    

In [26]:
df_orders.withColumn('bulk_order_flag',when(col('quantity')>=3,'Y').otherwise('N'))\
            .groupBy('state').agg(count('*').alias('total')\
                                ,sum(when(col('bulk_order_flag')=='Y',1).otherwise(0)).alias('bulk_order_count')\
                                 ).withColumn('bulk_percentage',(col('bulk_order_count')/col('total'))*100)\
                                .show()

+----------------+-----+----------------+------------------+
|           state|total|bulk_order_count|   bulk_percentage|
+----------------+-----+----------------+------------------+
|       Karnataka|45555|           15091|  33.1269893535287|
|          Odisha|45719|           15201| 33.24875872175682|
|          Kerala|45691|           15362| 33.62150095204745|
|      Tamil Nadu|45638|           15257|33.430474604496254|
|    Chhattisgarh|45562|           15101| 33.14384794346166|
|  Andhra Pradesh|45650|           15179| 33.25082146768894|
|  Madhya Pradesh|45254|           15036| 33.22579219516506|
|          Punjab|45479|           14874| 32.70520459992524|
|             Goa|45090|           15200| 33.71035706365048|
|Himachal Pradesh|45686|           15194|33.257453049074115|
|         Haryana|45500|           15222| 33.45494505494506|
|       Jharkhand|45490|           15173|33.354583424928556|
|         Gujarat|45340|           15125|33.359064843405385|
|           Delhi|45404|

In [None]:
#spark.sql

In [7]:
df_orders.groupBy('state')\
            .agg(max('price').alias('max_price'),min('price').alias('min_price'))\
            .withColumn('price_diff',col('max_price')-col('min_price'))\
            .filter(col('price_diff')<100)\
                .show()



+----------------+---------+---------+----------+
|           state|max_price|min_price|price_diff|
+----------------+---------+---------+----------+
|  Madhya Pradesh|   199.99|   100.01|     99.98|
|Himachal Pradesh|   199.99|    100.0| 99.990005|
+----------------+---------+---------+----------+



                                                                                

In [None]:
#spark.sql

In [46]:
#step-1 orders placed after 2005,order_status not cancelled
#step-2 save the overall avg order value
#step-3 groupby state
df_base= df_orders.filter( (year(col('order_date')).alias('year')>2005) \
                 &(col('order_status')!='CANCELLED')
                )

#overal_avg_order_value = df_base.agg((sum(col('price'))*sum(col('quantity')))/df_base.count()).collect()[0][0]

overal_avg_order_value = df_base.agg((sum(col('price'))*sum(col('quantity')))/count('*')).collect()[0][0]
                                     
print('overal_avg_order_value:',overal_avg_order_value)



df_state= df_base.groupBy('state')\
            .agg(countDistinct(year(col('order_date'))).alias('distinct_years')\
                ,count('*').alias('total_order_by_state')\
                ,sum(col('price')*col('quantity')).alias('total_revenue')\
                 ,(sum(col('price')*col('quantity'))/count('*')).alias('avg_order_value_state')\
                )
df_state.show()

df_final = df_state.filter(col('distinct_years')>=3)\
                           .filter(col('avg_order_value_state')>overal_avg_order_value)\
                           .orderBy(col('total_revenue').desc())
df_final.show()


                                                                                

overal_avg_order_value: 216229310.32642326


                                                                                

+----------------+--------------+--------------------+-----------------+---------------------+
|           state|distinct_years|total_order_by_state|    total_revenue|avg_order_value_state|
+----------------+--------------+--------------------+-----------------+---------------------+
|       Karnataka|            19|               32915|9852881.308052063|    299.3431963558275|
|          Odisha|            19|               32930|9886126.528282166|     300.216414463473|
|          Kerala|            19|               32940|9914585.815734863|     300.989247593651|
|      Tamil Nadu|            19|               33012|9877939.402801514|    299.2226888041171|
|    Chhattisgarh|            19|               32888| 9862969.16571045|   299.89568127312236|
|  Andhra Pradesh|            19|               32980|9877574.530303955|   299.50195664960444|
|  Madhya Pradesh|            19|               32533| 9743285.99861145|    299.4893184954185|
|          Punjab|            19|               32



+-----+--------------+--------------------+-------------+---------------------+
|state|distinct_years|total_order_by_state|total_revenue|avg_order_value_state|
+-----+--------------+--------------------+-------------+---------------------+
+-----+--------------+--------------------+-------------+---------------------+



                                                                                

In [47]:
spark.stop()