In [1]:
import findspark
findspark.init("/opt/manual/spark")
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [2]:
spark = SparkSession \
.builder \
.master("local[2]") \
.appName("PySpark Dataframe Homework") \
.config("spark.executer.memory","2g") \
.config("spark.driver.memory","1g") \
.getOrCreate()

# .config("spark.storage.memoryFraction","0.1") \
# .config("park.shuffle.memoryFraction","0.0") \

In [3]:
data_path = "file:///home/train/datasets/retail_db"
output_path = "file:///home/train/pyspark_output_data"

In [4]:
categories = spark.read.option("inferSchema",True).csv(data_path+"/categories.csv", header=True)
categories.show(3)

+----------+--------------------+-------------------+
|categoryId|categoryDepartmentId|       categoryName|
+----------+--------------------+-------------------+
|         1|                   2|           Football|
|         2|                   2|             Soccer|
|         3|                   2|Baseball & Softball|
+----------+--------------------+-------------------+
only showing top 3 rows



In [5]:
categories.printSchema()

root
 |-- categoryId: integer (nullable = true)
 |-- categoryDepartmentId: integer (nullable = true)
 |-- categoryName: string (nullable = true)



In [6]:
products = spark.read.option("inferSchema",True).csv(data_path+"/products.csv", header=True)
products.show(3)

+---------+-----------------+--------------------+------------------+------------+--------------------+
|productId|productCategoryId|         productName|productDescription|productPrice|        productImage|
+---------+-----------------+--------------------+------------------+------------+--------------------+
|        1|                2|Quest Q64 10 FT. ...|              null|       59.98|http://images.acm...|
|        2|                2|Under Armour Men'...|              null|      129.99|http://images.acm...|
|        3|                2|Under Armour Men'...|              null|       89.99|http://images.acm...|
+---------+-----------------+--------------------+------------------+------------+--------------------+
only showing top 3 rows



In [7]:
products.printSchema()

root
 |-- productId: integer (nullable = true)
 |-- productCategoryId: integer (nullable = true)
 |-- productName: string (nullable = true)
 |-- productDescription: string (nullable = true)
 |-- productPrice: double (nullable = true)
 |-- productImage: string (nullable = true)



In [8]:
orders = spark.read.option("inferSchema",True).csv(data_path+"/orders.csv", header=True)
orders.show(3)

+-------+--------------------+---------------+---------------+
|orderId|           orderDate|orderCustomerId|    orderStatus|
+-------+--------------------+---------------+---------------+
|      1|2013-07-25 00:00:...|          11599|         CLOSED|
|      2|2013-07-25 00:00:...|            256|PENDING_PAYMENT|
|      3|2013-07-25 00:00:...|          12111|       COMPLETE|
+-------+--------------------+---------------+---------------+
only showing top 3 rows



In [24]:
orders.printSchema()

root
 |-- orderId: integer (nullable = true)
 |-- orderDate: string (nullable = true)
 |-- orderCustomerId: integer (nullable = true)
 |-- orderStatus: string (nullable = true)



In [9]:
order_items = spark.read.option("inferSchema",True).csv(data_path+"/order_items.csv", header=True)
order_items.show(3)

+-------------+----------------+------------------+-----------------+-----------------+---------------------+
|orderItemName|orderItemOrderId|orderItemProductId|orderItemQuantity|orderItemSubTotal|orderItemProductPrice|
+-------------+----------------+------------------+-----------------+-----------------+---------------------+
|            1|               1|               957|                1|           299.98|               299.98|
|            2|               2|              1073|                1|           199.99|               199.99|
|            3|               2|               502|                5|            250.0|                 50.0|
+-------------+----------------+------------------+-----------------+-----------------+---------------------+
only showing top 3 rows



In [25]:
order_items.printSchema()

root
 |-- orderItemName: integer (nullable = true)
 |-- orderItemOrderId: integer (nullable = true)
 |-- orderItemProductId: integer (nullable = true)
 |-- orderItemQuantity: integer (nullable = true)
 |-- orderItemSubTotal: double (nullable = true)
 |-- orderItemProductPrice: double (nullable = true)



# 3.1. `order_items` tablosunda kaç tane tekil `orderItemOrderId` vardır sayısını bulunuz.

In [12]:
orders.select("orderId").distinct().count()

68883

# 3.2. `orders` ve `order_items` tablolarında kaç satır vardır bulunuz.

In [13]:
orders.count()

68883

In [23]:
order_items.count()

172198

# Cevap 3.3. ve 3.4 

In [10]:
order_items.select("orderItemOrderId").distinct().count()

57431

In [11]:
order_items.count()

172198

In [None]:
# Yukarıdaki rakamlardan orders'ın siparişler olduğunu ve her siparişte birden fazla ürün olduğu için 
# hangi siparişten hangi ürünlerin bulunduğunu order_items tablosundan anlıyoruz. 
# Şimdi bizden istenen iş en çok iptal edilen ürün ve kategorileri bulmak olduğu için öncelikle siparişler
# ile sipariş detaylarını birleştirmeliyiz. 
# Daha sonra bu birleşimden iptal olan siparişleri filtrelemeliyiz.

# Joins

In [14]:
# Products ve Categories birleştirme. Sadece ihtiyacımız olanları seçiyoruz.
cat_products = products.join(categories, products['productCategoryId'] == categories['categoryId']) \
.select("productId","productName", "categoryName")
cat_products.limit(5).toPandas().head()

Unnamed: 0,productId,productName,categoryName
0,1,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,Soccer
1,2,Under Armour Men's Highlight MC Football Clea,Soccer
2,3,Under Armour Men's Renegade D Mid Football Cl,Soccer
3,4,Under Armour Men's Renegade D Mid Football Cl,Soccer
4,5,Riddell Youth Revolution Speed Custom Footbal,Soccer


In [15]:
# orders ve order_items birleştirme
orders_and_items = order_items.join(orders, order_items['orderItemOrderId'] == orders['orderId']) \
.select("orderId","orderItemProductId","orderItemSubTotal","orderStatus")
orders_and_items.limit(5).toPandas().head()

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus
0,1,957,299.98,CLOSED
1,2,1073,199.99,PENDING_PAYMENT
2,2,502,250.0,PENDING_PAYMENT
3,2,403,129.99,PENDING_PAYMENT
4,4,897,49.98,CLOSED


In [16]:
# Ceategory-Products birleşimini iptal edilmiş siparişlerle ürün id üzerinden birleştirelim
final_table = orders_and_items.join(cat_products, orders_and_items['orderItemProductId'] == cat_products['productId'])
final_table.limit(25).toPandas().head(25)

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus,productId,productName,categoryName
0,57760,858,199.99,PENDING_PAYMENT,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
1,57847,858,199.99,COMPLETE,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
2,58071,858,199.99,PENDING,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
3,58170,858,199.99,PENDING,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
4,58585,858,199.99,CANCELED,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
5,58589,858,199.99,COMPLETE,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
6,58695,858,199.99,COMPLETE,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
7,58774,858,199.99,PENDING,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
8,58797,858,199.99,COMPLETE,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs
9,58926,858,199.99,PENDING_PAYMENT,858,GolfBuddy VT3 GPS Watch,Kids' Golf Clubs


# 3.3. Toplam satış tutarı bakımından en çok iptal edilen (azalan sıra) ürünleri lokal diske parquet formatında yazınız.

In [17]:
#İş ihtiyacı-1: Toplam satış tutarı bakımından en çok iptal edilen ürünleri diske parquet formatında yazınız
most_cancelled_products = final_table.filter("orderStatus == 'CANCELED'") \
.groupBy("productName").agg(F.sum("orderItemSubTotal").alias("Totalprice")) \
.orderBy(F.desc("TotalPrice"))
most_cancelled_products.limit(10).toPandas().head()

Unnamed: 0,productName,Totalprice
0,Field & Stream Sportsman 16 Gun Fire Safe,134393.28
1,Perfect Fitness Perfect Rip Deck,85785.7
2,Nike Men's Free 5.0+ Running Shoe,80691.93
3,Diamondback Women's Serene Classic Comfort Bi,80094.66
4,Pelican Sunstream 100 Kayak,66196.69


In [18]:
# coalesce parçaları birleştirir tek dosya oluşturur.
most_cancelled_products.coalesce(1) \
.write.mode("overwrite").parquet(output_path+"/most_cancelled_products_parquet")

In [19]:
# Kontrol
spark.read.parquet(output_path+"/most_cancelled_products_parquet") \
.limit(10).toPandas().head(10)

Unnamed: 0,productName,Totalprice
0,Field & Stream Sportsman 16 Gun Fire Safe,134393.28
1,Perfect Fitness Perfect Rip Deck,85785.7
2,Nike Men's Free 5.0+ Running Shoe,80691.93
3,Diamondback Women's Serene Classic Comfort Bi,80094.66
4,Pelican Sunstream 100 Kayak,66196.69
5,Nike Men's Dri-FIT Victory Golf Polo,65750.0
6,Nike Men's CJ Elite 2 TD Football Cleat,60705.33
7,O'Brien Men's Neoprene Life Vest,58126.74
8,Under Armour Girls' Toddler Spine Surge Runni,26153.46
9,LIJA Women's Eyelet Sleeveless Golf Polo,2145.0


# 3.4. Toplam satış tutarı bakımından en çok iptal edilen (azalan sıra) kategorileri local diske parquet formatında yazınız.

In [20]:
#İş ihtiyacı-2: Toplam satış tutarı bakımından en çok iptal edilen kategorileri diske parquet formatında yazınız
most_cancelled_cats = final_table.filter("orderStatus == 'CANCELED'") \
.groupBy("categoryName").agg(F.sum("orderItemSubTotal").alias("Totalprice")) \
.orderBy(F.desc("TotalPrice"))
most_cancelled_cats.limit(10).toPandas().head(10)

Unnamed: 0,categoryName,Totalprice
0,Fishing,134393.28
1,Cleats,85785.7
2,Cardio Equipment,81351.93
3,Camping & Hiking,80094.66
4,Water Sports,66196.69
5,Women's Apparel,65750.0
6,Men's Footwear,60705.33
7,Indoor/Outdoor Games,58126.74
8,Shop By Sport,27423.44
9,Electronics,5685.5


In [21]:
# coalesce parçaları birleştirir tek dosya oluşturur.
most_cancelled_cats.coalesce(1) \
.write.mode("overwrite").parquet(output_path+"/most_cancelled_cats_parquet")

In [22]:
# Kontrol
spark.read.parquet(output_path+"/most_cancelled_cats_parquet") \
.limit(10).toPandas().head(10)

Unnamed: 0,categoryName,Totalprice
0,Fishing,134393.28
1,Cleats,85785.7
2,Cardio Equipment,81351.93
3,Camping & Hiking,80094.66
4,Water Sports,66196.69
5,Women's Apparel,65750.0
6,Men's Footwear,60705.33
7,Indoor/Outdoor Games,58126.74
8,Shop By Sport,27423.44
9,Electronics,5685.5


# 3.5. En yüksek ortalama satış hangi yılın hangi ayında olmuştur?

In [27]:
orders_and_items_date = order_items.join(orders, order_items['orderItemOrderId'] == orders['orderId']) \
.select("orderId","orderItemProductId","orderItemSubTotal","orderStatus","orderDate")
orders_and_items_date.limit(5).toPandas().head()

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus,orderDate
0,1,957,299.98,CLOSED,2013-07-25 00:00:00.0
1,2,1073,199.99,PENDING_PAYMENT,2013-07-25 00:00:00.0
2,2,502,250.0,PENDING_PAYMENT,2013-07-25 00:00:00.0
3,2,403,129.99,PENDING_PAYMENT,2013-07-25 00:00:00.0
4,4,897,49.98,CLOSED,2013-07-25 00:00:00.0


In [36]:
df_year_and_month = orders_and_items_date.withColumn("orderDate", 
                                F.to_timestamp(F.col("orderDate"), "yyyy-MM-dd HH:mm:ss.S")) \
.withColumn("Year", F.year(F.col("orderDate"))) \
.withColumn("Month", F.month(F.col("orderDate")))

df_year_and_month.limit(5).toPandas()

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus,orderDate,Year,Month
0,1,957,299.98,CLOSED,2013-07-25,2013,7
1,2,1073,199.99,PENDING_PAYMENT,2013-07-25,2013,7
2,2,502,250.0,PENDING_PAYMENT,2013-07-25,2013,7
3,2,403,129.99,PENDING_PAYMENT,2013-07-25,2013,7
4,4,897,49.98,CLOSED,2013-07-25,2013,7


In [37]:
df_year_and_month.filter("orderStatus not in ('CANCELED')") \
.groupBy('Year','Month').agg(F.avg("orderItemSubTotal").alias("Avg_SubTotal")) \
.orderBy(F.desc("Avg_SubTotal")) \
.limit(10).toPandas()

Unnamed: 0,Year,Month,Avg_SubTotal
0,2014,6,203.35321
1,2013,7,200.611156
2,2014,5,200.126168
3,2014,3,199.530966
4,2013,9,199.481791
5,2014,1,199.321959
6,2013,12,199.305408
7,2013,11,199.258566
8,2014,7,198.761607
9,2013,10,198.57837


# 3.6. En yüksek ortalama satış haftanın hangi gününde olmuştur?

In [38]:
df_day_of_week = orders_and_items_date.withColumn("orderDate", 
                                F.to_timestamp(F.col("orderDate"), "yyyy-MM-dd HH:mm:ss.S")) \
.withColumn("Day", F.dayofweek(F.col("orderDate")))

df_day_of_week.limit(5).toPandas()

Unnamed: 0,orderId,orderItemProductId,orderItemSubTotal,orderStatus,orderDate,Day
0,1,957,299.98,CLOSED,2013-07-25,5
1,2,1073,199.99,PENDING_PAYMENT,2013-07-25,5
2,2,502,250.0,PENDING_PAYMENT,2013-07-25,5
3,2,403,129.99,PENDING_PAYMENT,2013-07-25,5
4,4,897,49.98,CLOSED,2013-07-25,5


In [39]:
df_day_of_week.filter("orderStatus not in ('CANCELED')") \
.groupBy('Day').agg(F.avg("orderItemSubTotal").alias("Avg_SubTotal")) \
.orderBy(F.desc("Avg_SubTotal")) \
.limit(10).toPandas()

Unnamed: 0,Day,Avg_SubTotal
0,4,200.507284
1,6,200.249034
2,5,200.096991
3,7,198.945504
4,1,198.934444
5,3,198.632952
6,2,197.968987
