# Read csv file from the local fileSystem

In [0]:
df=spark.read.options(header=True,inferSchema=True,delimiter=',').csv("/FileStore/tables/Business_Sales_Transaction.csv")
display(df)

TransactionNo,Date,ProductNo,ProductName,Price,Quantity,CustomerNo,Country
536365,2018-12-01,85123A,Cream Hanging Heart T-Light Holder,1.88,6,17850,United Kingdom
536365,2018-12-01,71053,White Moroccan Metal Lantern,2.01,6,17850,United Kingdom
536365,2018-12-01,84406B,Cream Cupid Hearts Coat Hanger,1.91,8,17850,United Kingdom
536365,2018-12-01,84029G,Knitted Union Flag Hot Water Bottle,2.01,6,17850,United Kingdom
536365,2018-12-01,84029E,Red Woolly Hottie White Heart,2.01,6,17850,United Kingdom
536365,2018-12-01,22752,Set 7 Babushka Nesting Boxes,2.65,2,17850,United Kingdom
536365,2018-12-01,21730,Glass Star Frosted T-Light Holder,2.14,6,17850,United Kingdom
536366,2018-12-01,22633,Hand Warmer Union Jack,1.78,6,17850,United Kingdom
536366,2018-12-01,22632,Hand Warmer Red Retrospot,1.78,6,17850,United Kingdom
536367,2018-12-01,84879,Assorted Colour Bird Ornament,1.75,32,13047,United Kingdom


# Count nbr of records

In [0]:
dfCount=df.count()
print(dfCount)

536350


# Defining & Applying a Schema

In [0]:
df.printSchema()

root
 |-- TransactionNo: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- ProductNo: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- CustomerNo: string (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
#Converting date from string to date
from pyspark.sql.functions import col,to_date
df=df.withColumn('date',to_date(col('date')))
df.printSchema()

root
 |-- TransactionNo: string (nullable = true)
 |-- date: date (nullable = true)
 |-- ProductNo: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- CustomerNo: string (nullable = true)
 |-- Country: string (nullable = true)



# Checking Null Values

In [0]:
from pyspark.sql.functions import when,count
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+-------------+----+---------+-----------+-----+--------+----------+-------+
|TransactionNo|date|ProductNo|ProductName|Price|Quantity|CustomerNo|Country|
+-------------+----+---------+-----------+-----+--------+----------+-------+
|            0|   0|        0|          0|    0|       0|         0|      0|
+-------------+----+---------+-----------+-----+--------+----------+-------+



# Checking Canceled Transaction

In [0]:
transC=df[df['TransactionNo'].startswith('C')].count()
#How many valid transactions we have
print(dfCount-transC)

527765


# Sales Per Month

In [0]:
from pyspark.sql.functions import month
valid_trans=df[~(df['TransactionNo'].startswith('C'))]
grouped_df=valid_trans.groupBy(month('date').alias('month')).agg(count('TransactionNo').alias('valid_transactions')).orderBy('valid_transactions',ascending=False)
display(grouped_df)

month,valid_transactions
11,83042
12,66429
10,59041
9,49007
7,38398
5,36015
6,35807
3,35644
8,34318
1,34137


# Most Frequently Purchased Products

In [0]:
groupedDF=valid_trans.groupBy('ProductName').agg(count('TransactionNo').alias('countTransactions')).orderBy('countTransactions',ascending=False)
display(groupedDF.limit(5))

ProductName,countTransactions
Cream Hanging Heart T-Light Holder,2336
Jumbo Bag Red Retrospot,2115
Regency Cakestand 3 Tier,2019
Party Bunting,1708
Lunch Bag Red Retrospot,1597


# How many products does the customer purchase in each transaction

In [0]:
groupedDF=valid_trans.groupBy('TransactionNo').agg(count('ProductNo').alias('count_of_products_purchased')).orderBy('count_of_products_purchased',ascending=False)
display(groupedDF)

TransactionNo,count_of_products_purchased
573585,1111
581219,747
581492,730
580729,720
558475,704
579777,685
581217,674
537434,673
580730,661
580367,649


# What are the most profitable segment customers?

In [0]:
groupedDF=valid_trans.groupBy('Country').agg(count('TransactionNo').alias('NbValidTransactions')).orderBy('NbValidTransactions',ascending=False)
display(groupedDF)

Country,NbValidTransactions
United Kingdom,477771
France,10393
Germany,10240
EIRE,7807
Belgium,2507
Spain,2386
Netherlands,2326
Switzerland,2303
Portugal,1838
Australia,1631


# Loading back to DFBS

In [0]:
#valid transactions
valid_trans.write.mode('overwrite').parquet('/tmp/valid_transactions.parquet')

In [None]:
%fs ls /tmp/valid_transactions.parquet