In [164]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, avg

In [165]:
spark = SparkSession.builder \
    .appName("Transfrom") \
    .master("local[*]") \
    .getOrCreate()

In [166]:
path= r"C://Users//44754//Downloads//sales.csv"

In [167]:
dataframe = spark.read.csv(path,header = True, inferSchema = True)

In [168]:
dataframe.show()

+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|              Tuvalu|      Baby Food|      Offline|             H| 5/28/2010|669165933| 6/27/2010|      9925|    255.28|   159.42|    2533654.0| 1582243.5|    951410.5|
|Central America a...|             Grenada|         Cereal|       Online|             C| 8/22/2012|963881480| 9/15/2012|      2804|     205.7|   117.11|     576782.8| 328376.44|   248406.36|
|              Europe|              Russia|Of

In [170]:
# rename the column
dataframe = dataframe.withColumnRenamed('Order Date','Purchase Date')
dataframe.show()

+--------------------+--------------------+---------------+-------------+--------------+-------------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Purchase Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+-------------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|              Tuvalu|      Baby Food|      Offline|             H|    5/28/2010|669165933| 6/27/2010|      9925|    255.28|   159.42|    2533654.0| 1582243.5|    951410.5|
|Central America a...|             Grenada|         Cereal|       Online|             C|    8/22/2012|963881480| 9/15/2012|      2804|     205.7|   117.11|     576782.8| 328376.44|   248406.36|
|              Europe|        

In [171]:
dataframe.dtypes

[('Region', 'string'),
 ('Country', 'string'),
 ('Item Type', 'string'),
 ('Sales Channel', 'string'),
 ('Order Priority', 'string'),
 ('Purchase Date', 'string'),
 ('Order ID', 'int'),
 ('Ship Date', 'string'),
 ('Units Sold', 'int'),
 ('Unit Price', 'double'),
 ('Unit Cost', 'double'),
 ('Total Revenue', 'double'),
 ('Total Cost', 'double'),
 ('Total Profit', 'double')]

In [173]:
# Filter Data Based on Condition
dataframefilter = dataframe.filter(col('Item Type') == 'Household')
dataframefilter.show(5)

+--------------------+------------+---------+-------------+--------------+-------------+---------+---------+----------+----------+---------+-------------+----------+------------+
|              Region|     Country|Item Type|Sales Channel|Order Priority|Purchase Date| Order ID|Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+------------+---------+-------------+--------------+-------------+---------+---------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|      Angola|Household|      Offline|             M|    4/23/2011|135425221|4/27/2011|      4187|    668.27|   502.54|   2798046.49|2104134.98|   693911.51|
|Central America a...|    Honduras|Household|      Offline|             H|     2/8/2017|522840487|2/13/2017|      8974|    668.27|   502.54|   5997054.98|4509793.96|  1487261.02|
|                Asia|Turkmenistan|Household|      Offline|             L|   12/30/2010|441619336|1/20/20

In [184]:
# rename a column
dataframe1 = dataframe.withColumnRenamed('Total Revenue', 'Total Sales')


In [185]:
dataframe2 = dataframe1.withColumnRenamed('Total Cost', 'Total Purchase')

In [186]:
dataframe2.show()

+--------------------+--------------------+---------------+-------------+--------------+-------------+---------+----------+----------+----------+---------+-----------+--------------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Purchase Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Sales|Total Purchase|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+-------------+---------+----------+----------+----------+---------+-----------+--------------+------------+
|Australia and Oce...|              Tuvalu|      Baby Food|      Offline|             H|    5/28/2010|669165933| 6/27/2010|      9925|    255.28|   159.42|  2533654.0|     1582243.5|    951410.5|
|Central America a...|             Grenada|         Cereal|       Online|             C|    8/22/2012|963881480| 9/15/2012|      2804|     205.7|   117.11|   576782.8|     328376.44|   248406.36|
|              Europ

In [163]:
# Add a new column

dataframe = dataframe.withColumn('Total sales', col('Unit Price') * col('Units Sold'))
dataframe.show(5)

+--------------------+--------------------+---------------+-------------+--------------+-------------+---------+---------+----------+----------+---------+----------+------------+-----------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Purchase Date| Order ID|Ship Date|Units Sold|Unit Price|Unit Cost|Total Cost|Total Profit|      Total sales|
+--------------------+--------------------+---------------+-------------+--------------+-------------+---------+---------+----------+----------+---------+----------+------------+-----------------+
|Australia and Oce...|              Tuvalu|      Baby Food|      Offline|             H|    5/28/2010|669165933|6/27/2010|      9925|    255.28|   159.42| 1582243.5|    951410.5|        2533654.0|
|Central America a...|             Grenada|         Cereal|       Online|             C|    8/22/2012|963881480|9/15/2012|      2804|     205.7|   117.11| 328376.44|   248406.36|576782.7999999999|
|              

In [187]:
dataframe2.dtypes

[('Region', 'string'),
 ('Country', 'string'),
 ('Item Type', 'string'),
 ('Sales Channel', 'string'),
 ('Order Priority', 'string'),
 ('Purchase Date', 'string'),
 ('Order ID', 'int'),
 ('Ship Date', 'string'),
 ('Units Sold', 'int'),
 ('Unit Price', 'double'),
 ('Unit Cost', 'double'),
 ('Total Sales', 'double'),
 ('Total Purchase', 'double'),
 ('Total Profit', 'double')]

In [188]:
# drop a column

dataframe3 = dataframe2.drop('Total Revenue')

In [189]:
dataframe3.show(5)

+--------------------+--------------------+---------------+-------------+--------------+-------------+---------+---------+----------+----------+---------+-----------+--------------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Purchase Date| Order ID|Ship Date|Units Sold|Unit Price|Unit Cost|Total Sales|Total Purchase|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+-------------+---------+---------+----------+----------+---------+-----------+--------------+------------+
|Australia and Oce...|              Tuvalu|      Baby Food|      Offline|             H|    5/28/2010|669165933|6/27/2010|      9925|    255.28|   159.42|  2533654.0|     1582243.5|    951410.5|
|Central America a...|             Grenada|         Cereal|       Online|             C|    8/22/2012|963881480|9/15/2012|      2804|     205.7|   117.11|   576782.8|     328376.44|   248406.36|
|              Europe|   

In [190]:
#drop orderid
dataframe4 = dataframe3.drop('Order ID')

In [191]:
dataframe4.dtypes

[('Region', 'string'),
 ('Country', 'string'),
 ('Item Type', 'string'),
 ('Sales Channel', 'string'),
 ('Order Priority', 'string'),
 ('Purchase Date', 'string'),
 ('Ship Date', 'string'),
 ('Units Sold', 'int'),
 ('Unit Price', 'double'),
 ('Unit Cost', 'double'),
 ('Total Sales', 'double'),
 ('Total Purchase', 'double'),
 ('Total Profit', 'double')]

In [192]:
# Group By and Aggregate
groupdf = dataframe4.groupBy('Order Priority').agg(sum('Total sales'))

In [193]:
from pyspark.sql.functions import col, lit, when, sum, avg

In [194]:
groupdf.show()

+--------------+--------------------+
|Order Priority|    sum(Total sales)|
+--------------+--------------------+
|             L|       3.662812746E7|
|             M|3.3116031750000004E7|
|             C|1.8855063050000004E7|
|             H| 4.874954604999999E7|
+--------------+--------------------+



In [154]:
# how to format sum(total sales)
from pyspark.sql.functions import sum, format_number


In [196]:
formatted_df = dataframe4.groupBy("Order Priority") \
    .agg(sum("Total Sales").alias("Total_Sales_Sum")) \
    .withColumn("Formatted_Total_Sales", format_number("Total_Sales_Sum", 2)) \
    .select("Order Priority", "Formatted_Total_Sales")

In [197]:
formatted_df.show()

+--------------+---------------------+
|Order Priority|Formatted_Total_Sales|
+--------------+---------------------+
|             L|        36,628,127.46|
|             M|        33,116,031.75|
|             C|        18,855,063.05|
|             H|        48,749,546.05|
+--------------+---------------------+



In [199]:

# group by revenue
revenue = dataframe4.groupBy('Region').sum('Total Sales')

In [201]:
revenue.show()

+--------------------+--------------------+
|              Region|    sum(Total Sales)|
+--------------------+--------------------+
|Middle East and N...|       1.405270658E7|
|Australia and Oce...|1.4094265130000003E7|
|              Europe|       3.336893211E7|
|  Sub-Saharan Africa| 3.967203143000001E7|
|Central America a...|          9170385.49|
|       North America|   5643356.550000001|
|                Asia|2.1347091020000003E7|
+--------------------+--------------------+



In [204]:
formatreg = revenue.withColumn("sum(Total Sales)", format_number(col("sum(Total Sales)"),2))

In [226]:
formatreg.show()

+--------------------+----------------+
|              Region|sum(Total Sales)|
+--------------------+----------------+
|Middle East and N...|   14,052,706.58|
|Australia and Oce...|   14,094,265.13|
|              Europe|   33,368,932.11|
|  Sub-Saharan Africa|   39,672,031.43|
|Central America a...|    9,170,385.49|
|       North America|    5,643,356.55|
|                Asia|   21,347,091.02|
+--------------------+----------------+



In [227]:
# sort data by column
sortdf = dataframe4.orderBy('Order Priority')

In [229]:
sortdf.show()


+--------------------+--------------------+---------------+-------------+--------------+-------------+----------+----------+----------+---------+-----------+--------------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Purchase Date| Ship Date|Units Sold|Unit Price|Unit Cost|Total Sales|Total Purchase|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+-------------+----------+----------+----------+---------+-----------+--------------+------------+
|Central America a...|             Grenada|         Cereal|       Online|             C|    8/22/2012| 9/15/2012|      2804|     205.7|   117.11|   576782.8|     328376.44|   248406.36|
|  Sub-Saharan Africa|Sao Tome and Prin...|         Fruits|       Online|             C|    6/20/2014|  7/5/2014|      8102|      9.33|     6.92|   75591.66|      56065.84|    19525.82|
|Australia and Oce...|     Solomon Islands|      Baby Food|       Onli

In [230]:
# replaced value in column
relaced = dataframe4.withColumn('Sales Channel', when(col('Sales Channel') == 'online',1). otherwise(0))

In [232]:
relaced.show()

+--------------------+--------------------+---------------+-------------+--------------+-------------+----------+----------+----------+---------+-----------+--------------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Purchase Date| Ship Date|Units Sold|Unit Price|Unit Cost|Total Sales|Total Purchase|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+-------------+----------+----------+----------+---------+-----------+--------------+------------+
|Australia and Oce...|              Tuvalu|      Baby Food|            0|             H|    5/28/2010| 6/27/2010|      9925|    255.28|   159.42|  2533654.0|     1582243.5|    951410.5|
|Central America a...|             Grenada|         Cereal|            0|             C|    8/22/2012| 9/15/2012|      2804|     205.7|   117.11|   576782.8|     328376.44|   248406.36|
|              Europe|              Russia|Office Supplies|           

In [243]:
#group by sales
groupchannel = dataframe4.groupBy("Sales Channel").agg(sum("Total Sales").alias('Total Salessum'))

In [244]:
groupchannel.show()

+-------------+-------------------+
|Sales Channel|     Total Salessum|
+-------------+-------------------+
|       Online|5.825395911000001E7|
|      Offline|7.909480919999997E7|
+-------------+-------------------+



In [249]:
formatted_df = groupchannel.withColumn("Total Salessum", format_number(col("Total Salessum"), 2))


In [250]:
formatted_df.show()

+-------------+--------------+
|Sales Channel|Total Salessum|
+-------------+--------------+
|       Online| 58,253,959.11|
|      Offline| 79,094,809.20|
+-------------+--------------+

