**E-Commerce_Analysis_PySpark**

In [None]:
!pip install pyspark

In [None]:
!pip install pygal

In [None]:
!pip install cairosvg

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import ipywidgets as widgets
from ipywidgets import interact, interactive
import pygal
spark = SparkSession.builder.appName("E-Commerce Analysis").getOrCreate()

In [None]:
reviews_df = spark.read.option("delimeter","|").csv('../input/brazilian-ecommerce/olist_order_reviews_dataset.csv',header = True)
reviews_df = reviews_df.na.drop(subset=['review_creation_date','review_answer_timestamp'])
reviews_df = reviews_df.withColumn('review_creation_date',to_timestamp('review_creation_date')).withColumn('review_answer_timestamp',to_timestamp('review_answer_timestamp'))
reviews_df.printSchema()
reviews_df.show()

In [None]:
payment_df = spark.read.option("delimeter","|").csv('../input/brazilian-ecommerce/olist_order_payments_dataset.csv',header = True)
payment_df = payment_df.na.drop()
payment_df.printSchema()
payment_df.show()

In [None]:
orders_df = spark.read.option("delimeter","|").csv('../input/brazilian-ecommerce/olist_orders_dataset.csv',header = True)
orders_df = orders_df.withColumn('order_purchase_timestamp', to_timestamp('order_purchase_timestamp')).withColumn('order_delivered_carrier_data', to_timestamp('order_delivered_carrier_date')).withColumn('order_approved_at',to_timestamp('order_approved_at')).withColumn('order_delivered_customer_date',to_timestamp('order_delivered_customer_date')).withColumn('order_estimed_delivery_date',to_timestamp('order_estimated_delivery_date'))
orders_df.printSchema()
orders_df.show()

In [None]:
customers_df = spark.read.option("delimeter","|").csv('../input/brazilian-ecommerce/olist_customers_dataset.csv',header = True)
customers_df = customers_df.na.drop()
customers_df.printSchema()
customers_df.show()

In [None]:
products_df = spark.read.option("delimeter","|").csv('../input/brazilian-ecommerce/olist_products_dataset.csv',header = True)
products_df = products_df.na.drop()
products_df.printSchema()
products_df.show()

In [None]:
item_df = spark.read.option("delimeter","|").csv('../input/brazilian-ecommerce/olist_order_items_dataset.csv',header = True).dropna()
item_df=item_df.withColumn('shipping_limit_date',to_timestamp('shipping_limit_date')).withColumn('order_item_id',item_df.order_item_id.cast('int'))
item_df.printSchema()
item_df.show()


In [None]:
sellers_df = spark.read.option("delimeter","|").csv('../input/brazilian-ecommerce/olist_sellers_dataset.csv',header = True).dropna()
sellers_df.printSchema()
sellers_df.show()

In [None]:
geolocation_df = spark.read.csv("../input/brazilian-ecommerce/olist_geolocation_dataset.csv",header = True)
geolocation_df = sellers_df.join(geolocation_df).where(sellers_df['seller_zip_code_prefix']==geolocation_df['geolocation_zip_code_prefix']).drop(*('seller_id','seller_zip_code_prefix','seller_state','geolocation_city')).withColumnRenamed('seller_city','geolocation_city')
geolocation_df = geolocation_df.select('geolocation_zip_code_prefix','geolocation_lat','geolocation_lng','geolocation_city','geolocation_state')
geolocation_df.show()

**Top Selling Product**

In [None]:
translation_df = spark.read.csv("../input/brazilian-ecommerce/product_category_name_translation.csv",header = True)
order_item_grpby = item_df.groupBy('product_id').agg({'order_item_id':'max'})
top_selling = order_item_grpby.join(products_df,['product_id'])
top_selling = top_selling.join(translation_df,['product_category_name'])
top_selling = top_selling.groupBy('product_category_name_english').agg({'max(order_item_id)':'max'}).withColumnRenamed('max(max(order_item_id))','order_item').withColumnRenamed('product_category_name_english','Product_Name')
top_selling.orderBy('order_item',ascending=False).show()

**Total Revenue generated between dd/mm/yyyy to dd/mm/yyyy**

In [None]:
from_date = widgets.DatePicker(
    description = 'Start Date'
)
To_date = widgets.DatePicker(
    description = 'End Date'
)
def Total_revenue(from_date,To_date):
    Total_revenue = orders_df.join(payment_df,['order_id']).filter(orders_df.order_status == 'delivered').drop(*('customer_id','order_status','order_delivered_customer_date','order_approved_at','order_delivered_carrier_date','order_estimated_delivery_date'))
    Total_revenue = Total_revenue.withColumn('Date',to_date('order_purchase_timestamp'))
    Total_revenue = Total_revenue.filter(Total_revenue.Date.between(from_date,To_date))
    Total_revenue = Total_revenue.agg({'payment_value':'sum'}).withColumnRenamed('sum(payment_value)','Total_revenue')
    Total_revenue = Total_revenue.withColumn('Total_revenue',Total_revenue.Total_revenue.cast('float'))
    Total_revenue.show()

out = widgets.interactive_output(Total_revenue,{'from_date':from_date,'To_date':To_date})
widgets.HBox([widgets.VBox([from_date,To_date]),out])

Total Orders by Product Category

In [None]:
product_category = item_df.join(products_df,['product_id'])
product_category = product_category.join(translation_df,['product_category_name'])
product_category = product_category.groupBy('product_category_name_english').count().withColumnRenamed('count','Total_item').withColumnRenamed('product_category_name_english','Product_Category')
product_category.show()

Total Number Of Customers By Region

In [None]:
Customer_by_region_state = customers_df.groupBy('customer_state').count().withColumnRenamed('count','No_of_customers').show()

In [None]:
Customer_by_region_city = customers_df.groupBy('customer_city').count().orderBy('count',ascending=False).withColumnRenamed('count','No_of_customers').show()

Revenue Generated Annually

In [None]:
@interact(Year=[('2018',2018),('2017',2017),('2016',2016)])
def Annual(Year):
  Annual_revenue = orders_df.join(payment_df,['order_id']).filter(orders_df.order_status=='delivered').drop(*('customer_id','order_status','order_delivered_customer_date','order_removed_at','order_delivered_carrier_date','order_estimated_delivery_date'))
  Annual_revenue = Annual_revenue.withColumn('Date',to_date('order_purchase_timestamp'))
  Annual_revenue = Annual_revenue.withColumn('year',year(to_timestamp('Date','dd/MM/yyyy')))
  Annual_revenue = Annual_revenue.filter(Annual_revenue.year==Year)
  Annual_revenue = Annual_revenue.agg({'payment_value':'sum'}).withColumnRenamed('sum(payment_value)','Total_Payment')
  Annual_revenue = Annual_revenue.withColumn('Total_Payment',Annual_revenue.Total_Payment.cast('int'))
  Annual_revenue.show()




Most Valued Customer and Salesman

In [None]:
orders_df_filter = orders_df.filter(orders_df.order_status=='delivered')
Most_fav = orders_df_filter.join(item_df,['order_id']).drop(*('order_status','order_purchase_timestamp','order_approved_at','order_delivered_carrier_date','order_delivered_customer_date','order_estimated_delivery_date','shipping_limit_date','price','freight_value'))
Most_fav_customer = Most_fav.groupBy('Customer_id').count().withColumnRenamed('count','total_product_buy')
Most_fav_customer.orderBy('total_product_buy',ascending=False).show()

In [None]:
Most_fav_seller = Most_fav.groupBy('seller_id').count().withColumnRenamed('count','total_product_sold')
Most_fav_seller.orderBy('total_product_sold',ascending=False).show()

Product Sale Over Time

In [None]:
from IPython.display import display, HTML

base_html = """
<!DOCTYPE html>
<html>
  <head>
  <script type="text/javascript" src="http://kozea.github.com/pygal.js/javascripts/svg.jquery.js"></script>
  <script type="text/javascript" src="https://kozea.github.io/pygal.js/2.0.x/pygal-tooltips.min.js""></script>
  </head>
  <body>
    <figure>
      {pygal_render}
    </figure>
  </body>
</html>
"""

In [None]:
list_product = translation_df.groupBy('product_category_name_english').count().select('product_category_name_english').rdd.flatMap(lambda x:x).collect()

@interact(Product=[str(x) for x in list_product])
def product(Product):
  product_sales = orders_df.join(item_df,['order_id']).join(products_df,['product_id']).join(translation_df,['product_category_name']).select('product_category_name_english','order_purchase_timestamp').withColumn('Date',to_date('order_purchase_timestamp'))
  product_sales = product_sales.filter(product_sales.product_category_name_english == Product)
  product_sales = product_sales.groupby('Date').count().sample(0.2)
  list_date = product_sales.select('Date').rdd.flatMap(lambda x:x).collect()
  list_count = product_sales.select('count').rdd.flatMap(lambda x:x).collect()
  chart = pygal.Line(width=1200,height=400,explicit_size=True,x_lable_rotation=20,show_minor_x_lable=False)
  chart.x_labels = map(str,[str(x) for x in list_date ])
  chart.add(Product,[x for x in list_count])
  HTML(base_html.format(pygal_render=chart.render()))
  chart.render_in_browser()
  product_sales.sample(0.1).orderBy('Date',ascending=True).show()



Total Order By Regions/Cities

In [None]:
Order_by_region = customers_df.join(orders_df_filter,['customer_id'],'leftsemi')
Customer_by_region_city = Order_by_region.groupBy('customer_city').count()
Customer_by_region_city.orderBy('count',ascending=False).withColumnRenamed('count','Total_orders').show()
Customer_by_region_city.sample(0.01).show()

Most Reviewed Product

In [None]:
Reviewed_product = item_df.join(orders_df_filter,['order_id'],'leftsemi').join(products_df,['product_id']).join(reviews_df,['order_id']).join(translation_df,['product_category_name']).select('review_id','order_id','product_id','product_category_name_english','review_comment_title','review_comment_message').dropna()
Reviewed_product = Reviewed_product.groupby('product_category_name_english','review_comment_title').count().groupBy('product_category_name_english').agg({'count':'sum'}).withColumnRenamed('sum(count)','Total_review')
Reviewed_product.orderBy('Total_review',ascending = False).show()

Min and Max Priced Products

In [None]:
minmax = item_df.groupBy('product_id').agg({'price':'min'})
product_priced = minmax.join(products_df,['product_id'])
product_priced = product_priced.join(translation_df,['product_category_name'])
product_priced = product_priced.groupBy('product_category_name_english').agg({'min(price)':'min'}).withColumnRenamed('min(min(price))','Price').withColumnRenamed('product_category_name_english','Product_Name')
product_priced.orderBy('Price',ascending=True).show()


minmax = item_df.groupBy('product_id').agg({'price':'max'})
product_priced = minmax.join(products_df,['product_id'])
product_priced = product_priced.join(translation_df,['product_category_name'])
product_priced = product_priced.groupBy('product_category_name_english').agg({'max(price)':'max'}).withColumnRenamed('max(max(price))','Price').withColumnRenamed('product_category_name_english','Product_Name')
product_priced.orderBy('Price',ascending=False).show()

Returning Customer to Understand Customer Loyalty

In [None]:
Return = orders_df.filter(orders_df.order_status=='delivered')
Loyalty = Return.join(item_df,['order_id']).drop(*('order_status','order_purchase_timestamp','order_approved_at','order_delivered_carrier_date','order_delivered_customer_date','order_estimated_delivery_date','shipping_limit_date','price','freight_value'))
Loyalty = Loyalty.groupBy('Customer_id').count().withColumnRenamed('count','total_product_buy')
print("The Most Valuable Customer is: ",Loyalty.orderBy('total_product_buy').tail(1))
Loyalty.orderBy('total_product_buy',ascending=False).show()

Number of Orders and Number of Customers

In [None]:
no_orders = orders_df.filter((orders_df.order_status !='canceled') & (orders_df.order_status !='unavailable'))
no_orders = no_orders.join(item_df,['order_id']).drop(*('order_status','order_purchase_timestamp','order_approved_at','order_delivered_carrier_date','order_delivered_customer_date','order_estimated_delivery_date','shipping_limit_date','price','freight_value'))
no_orders = no_orders.groupBy('customer_id').count()
no_orders = no_orders.groupby('customer_id').sum('count').orderBy('sum(count)',ascending=False).withColumnRenamed('sum(count)',"Total_orders")
no_orders_no_customers = no_orders.agg({'customer_id':'count','Total_orders':'sum'}).withColumnRenamed('sum(Total_orders)','Total_orders').withColumnRenamed("count(customer_id)",'Total_customer').show()

80-20 Analysis of Product vs Sales/Profit

In [None]:
month_yr = orders_df.withColumn('Month_year',date_format('order_purchase_timestamp','M')).withColumn('Year',year('order_purchase_timestamp'))
month_yr = month_yr.select('order_id','customer_id','Month_year','Year',concat(col('year'),lit('-'),trim(col('Month_year'))).alias('MY')).select('order_id','MY').withColumn('MY',to_date('MY'))
month_yr = month_yr.join(item_df,['order_id'])

In [None]:
Analysis = month_yr.drop('order_item_id').withColumn('Total_Price',month_yr['price']+month_yr['freight_value'])
Analysis = Analysis.groupBy('MY').agg({'order_id':'count','price':'sum','freight_value':'sum','Total_Price':'sum'})
Analysis = Analysis.withColumn('price_per_product',Analysis['sum(Total_price)']/Analysis['count(order_id)']).withColumn('freight_per_product',Analysis['sum(freight_value)']/Analysis['count(order_id)']).withColumnRenamed('sum(Total_price)','Total_price').withColumnRenamed('count(order_id)','Order_id').withColumnRenamed('sum(freight_value)','freight_value').withColumnRenamed('sum(price)','price').orderBy('MY',ascending=True)           
Analysis = Analysis.withColumn("Year",year('MY')).withColumn('price',Analysis.price.cast('int'))

In [None]:
from plotly.subplots import make_subplots
import plotly.graph_objects as go

Analysis_80_20 = Analysis.filter(Analysis['Year']==2017)                                        
list_date_a = Analysis_80_20.select('MY').rdd.flatMap(lambda x:x).collect()
list_date_count_a = Analysis_80_20.select('price').rdd.flatMap(lambda x:x).collect()
list_orders = Analysis_80_20.select('order_id').rdd.flatMap(lambda x:x).collect()


fig = go.Figure()
fig = make_subplots(specs=[[{'secondary_y':True}]])
fig.add_trace(
    go.Scatter(
        x = [str(x) for x in list_date_a],
        y = [x for x in list_date_count_a],
        name='Price',
        text = [x for x in list_date_count_a],
        
    ),secondary_y=False,)
fig.add_trace(
    go.Bar(
        x = [str(x) for x in list_date_a],
        y = [x for x in list_orders],
        name='order',
        text = [x for x in list_orders]
        
    ),
    secondary_y=True,
)
fig.update_layout(
    autosize=False,
    width = 1000,
    height = 500,
    title = 'Orders and Price',
    xaxis_title = ' Year-Month',
    yaxis_title = 'Price'

)
fig.update_yaxes(automargin=True,title_text='Product count',secondary_y=True)
fig.show()

Shipment Aging of Each Order

In [None]:
Shipment_Aging = orders_df_filter.join(item_df,['order_id']).join(products_df,['product_id']).join(translation_df,['product_category_name'])
Shipment_Aging = Shipment_Aging.withColumn('Shipment_Aging_days',datediff('order_delivered_carrier_date','order_approved_at')).select('product_category_name_english','order_id','customer_id','seller_id','Shipment_Aging_days')
Shipment_Aging.sample(0.001).show()