# Analyzing Sales Data Using Apache Spark 2.4
### gyleodhis@outlook.com
### [@gyleodhis](https://www.twitter.com/gyleodhis)
### ![@gyleodhis](./data/gyle.jpg)
#### Licence:
You can use this code for anything you may wish only leave this page:
#### AS IS; HOW IS, WHERE IS

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('data_processing').getOrCreate()

In [2]:
sales = spark.read.csv('./data/1500000 Sales Records.csv', inferSchema=True, header=True)
sales.columns # prints column names

['Region',
 'Country',
 'Item Type',
 'Sales Channel',
 'Order Priority',
 'Order Date',
 'Order ID',
 'Ship Date',
 'Units Sold',
 'Unit Price',
 'Unit Cost',
 'Total Revenue',
 'Total Cost',
 'Total Profit']

In [3]:
len(sales.columns) # total number of columns.

14

In [4]:
sales.count() # Returns the number of records in dataset

1500000

It is always a good practice to print the shape of the dataframe before proceeding with preprocessing as it gives an indication of the total number of rows and columns. There isn’t any direct function available in Spark to check
the shape of data; instead we need to combine the count and length of columns to print the shape..
#### Do not confuse with padas.shape() function.

In [5]:
print((sales.count(),len(sales.columns))) #prints number of rows and columns

(1500000, 14)


#### Showing datatypes of columns

In [6]:
sales.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [7]:
sales.show(5) # shows the top 20 rows unless specified.

+--------------------+----------------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|         Country|Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+----------------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|    South Africa|   Fruits|      Offline|             M| 7/27/2012|443368995| 7/28/2012|      1593|      9.33|     6.92|     14862.69|  11023.56|     3839.13|
|Middle East and N...|         Morocco|  Clothes|       Online|             M| 9/14/2013|667593514|10/19/2013|      4611|    109.28|    35.84|    503890.08| 165258.24|   338631.84|
|Australia and Oce...|Papua New Guinea|     Meat|      Offline|             M| 5/15/2015|940995

### Using .select() method to show only certain columns.

In [8]:
sales.select('Region','Country','Item Type','Total Profit').show(20)

+--------------------+----------------+---------------+------------+
|              Region|         Country|      Item Type|Total Profit|
+--------------------+----------------+---------------+------------+
|  Sub-Saharan Africa|    South Africa|         Fruits|     3839.13|
|Middle East and N...|         Morocco|        Clothes|   338631.84|
|Australia and Oce...|Papua New Guinea|           Meat|     20592.0|
|  Sub-Saharan Africa|        Djibouti|        Clothes|    41273.28|
|              Europe|        Slovakia|      Beverages|    62217.18|
|                Asia|       Sri Lanka|         Fruits|     3323.39|
|  Sub-Saharan Africa|     Seychelles |      Beverages|     9349.02|
|  Sub-Saharan Africa|        Tanzania|      Beverages|    23114.16|
|  Sub-Saharan Africa|           Ghana|Office Supplies|    113120.0|
|  Sub-Saharan Africa|        Tanzania|      Cosmetics|  1350622.16|
|                Asia|          Taiwan|         Fruits|    19361.94|
|Middle East and N...|         Alg

In [9]:
# Describing the data
sales.describe().show()
# For numerical columns, it returns the measure of the center and spread along with the count.
# For nonnumerical columns, it shows the count and the min and max values, which are based on
# alphabetic order of those fields and doesn’t signify any real meaning

+-------+------------------+-----------+----------+-------------+--------------+----------+--------------------+---------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|            Region|    Country| Item Type|Sales Channel|Order Priority|Order Date|            Order ID|Ship Date|        Units Sold|        Unit Price|         Unit Cost|     Total Revenue|        Total Cost|      Total Profit|
+-------+------------------+-----------+----------+-------------+--------------+----------+--------------------+---------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|           1500000|    1500000|   1500000|      1500000|       1500000|   1500000|             1500000|  1500000|           1500000|           1500000|           1500000|           1500000|           1500000|           1500000|
|   mean|              null|       null|      null| 

## Adding a New Column to a dataframe:
We can add a new column in the dataframe using the withColumn function of spark. Let us add a new column (age after 10 years) to ourdf dataframe (because its quite small to work with) by using the age column. We simply add 10 years to each value in the age
column:

In [10]:
df = spark.read.csv('./data/sample_data.csv', inferSchema=True, header=True)
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10)

+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family| mobile|age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|      3| 32|       9.0|     3|   Vivo|              42|
|      3| 27|      13.0|     3|  Apple|              37|
|      4| 22|       2.5|     0|Samsung|              32|
|      4| 37|      16.5|     4|  Apple|              47|
|      5| 27|       9.0|     1|     MI|              37|
|      4| 27|       9.0|     0|   Oppo|              37|
|      5| 37|      23.0|     5|   Vivo|              47|
|      5| 37|      23.0|     5|Samsung|              47|
|      3| 22|       2.5|     0|  Apple|              32|
|      3| 27|       6.0|     0|     MI|              37|
+-------+---+----------+------+-------+----------------+
only showing top 10 rows



In [11]:
# writing the changes to the oroginal data
df=df.withColumn("age_after_10_yrs",(df["age"]+10))
df.show()

+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family| mobile|age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|      3| 32|       9.0|     3|   Vivo|              42|
|      3| 27|      13.0|     3|  Apple|              37|
|      4| 22|       2.5|     0|Samsung|              32|
|      4| 37|      16.5|     4|  Apple|              47|
|      5| 27|       9.0|     1|     MI|              37|
|      4| 27|       9.0|     0|   Oppo|              37|
|      5| 37|      23.0|     5|   Vivo|              47|
|      5| 37|      23.0|     5|Samsung|              47|
|      3| 22|       2.5|     0|  Apple|              32|
|      3| 27|       6.0|     0|     MI|              37|
|      2| 27|       6.0|     2|   Oppo|              37|
|      5| 27|       6.0|     2|Samsung|              37|
|      3| 37|      16.5|     5|  Apple|              47|
|      5| 27|       6.0|     0|     MI|              37|
|      4| 22|       6.0|     1|

### Changing the datatype of a Column
To change the datatype of the age column from integer to double,
we can make use of the cast method in Spark. We need to import the
DoubleType from pyspark.types:

In [12]:
from pyspark.sql.types import StringType, DoubleType
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10)

+-------+---+----------+------+-------+----------------+----------+
|ratings|age|experience|family| mobile|age_after_10_yrs|age_double|
+-------+---+----------+------+-------+----------------+----------+
|      3| 32|       9.0|     3|   Vivo|              42|      32.0|
|      3| 27|      13.0|     3|  Apple|              37|      27.0|
|      4| 22|       2.5|     0|Samsung|              32|      22.0|
|      4| 37|      16.5|     4|  Apple|              47|      37.0|
|      5| 27|       9.0|     1|     MI|              37|      27.0|
|      4| 27|       9.0|     0|   Oppo|              37|      27.0|
|      5| 37|      23.0|     5|   Vivo|              47|      37.0|
|      5| 37|      23.0|     5|Samsung|              47|      37.0|
|      3| 22|       2.5|     0|  Apple|              32|      22.0|
|      3| 27|       6.0|     0|     MI|              37|      27.0|
+-------+---+----------+------+-------+----------------+----------+
only showing top 10 rows



## Filtering Data
Filtering records based on conditions is a common requirement when dealing with data. This helps in cleaning the data and keeping only relevant records. in pyspark this is done using the 
#### filter() function.

In [13]:
Africa = sales.filter(sales['Region']=='Sub-Saharan Africa') ## Returns stoke for african countries only
Africa.select('Region','Country','Item Type','Total Profit').show(20)

+------------------+--------------------+---------------+------------+
|            Region|             Country|      Item Type|Total Profit|
+------------------+--------------------+---------------+------------+
|Sub-Saharan Africa|        South Africa|         Fruits|     3839.13|
|Sub-Saharan Africa|            Djibouti|        Clothes|    41273.28|
|Sub-Saharan Africa|         Seychelles |      Beverages|     9349.02|
|Sub-Saharan Africa|            Tanzania|      Beverages|    23114.16|
|Sub-Saharan Africa|               Ghana|Office Supplies|    113120.0|
|Sub-Saharan Africa|            Tanzania|      Cosmetics|  1350622.16|
|Sub-Saharan Africa|              Uganda|  Personal Care|    11302.06|
|Sub-Saharan Africa|            Zimbabwe|Office Supplies|  1214903.75|
|Sub-Saharan Africa|            Ethiopia|      Cosmetics|   115101.94|
|Sub-Saharan Africa|              Uganda|      Cosmetics|  1048609.97|
|Sub-Saharan Africa|Sao Tome and Prin...|        Clothes|    202694.4|
|Sub-S

### Multiple Column Based Filtering
Returns reccords only if all the conditions are met.

In [14]:
multiplefilter = Africa.filter(Africa['Region']=='Sub-Saharan Africa').filter(Africa['Item Type']=='Fruits')
multiplefilter.select('Region','Country','Item Type','Total Profit').show(20)

+------------------+--------------------+---------+------------+
|            Region|             Country|Item Type|Total Profit|
+------------------+--------------------+---------+------------+
|Sub-Saharan Africa|        South Africa|   Fruits|     3839.13|
|Sub-Saharan Africa|            Tanzania|   Fruits|    23133.59|
|Sub-Saharan Africa|              Rwanda|   Fruits|    14602.19|
|Sub-Saharan Africa|          The Gambia|   Fruits|    20964.59|
|Sub-Saharan Africa|               Ghana|   Fruits|     3188.43|
|Sub-Saharan Africa|             Nigeria|   Fruits|     15978.3|
|Sub-Saharan Africa|          Mauritius |   Fruits|     5945.47|
|Sub-Saharan Africa|             Namibia|   Fruits|    23924.07|
|Sub-Saharan Africa|             Namibia|   Fruits|    11999.39|
|Sub-Saharan Africa|          Mozambique|   Fruits|     2248.53|
|Sub-Saharan Africa|Sao Tome and Prin...|   Fruits|    13170.65|
|Sub-Saharan Africa|           Swaziland|   Fruits|     22244.3|
|Sub-Saharan Africa|     

### Distinct Values in a column(s)

In [15]:
Africa.select('Country').distinct().show() #Shows african countries in the dataframe.
#Replacing .show() with .count() returns the number of discticnt values.

+--------------------+
|             Country|
+--------------------+
|                Chad|
|             Senegal|
|             Eritrea|
|            Djibouti|
|              Malawi|
|             Comoros|
|              Rwanda|
|               Sudan|
|                Togo|
|   Equatorial Guinea|
|              Angola|
|             Lesotho|
|          Madagascar|
|               Ghana|
|          The Gambia|
|        Sierra Leone|
|               Benin|
|Sao Tome and Prin...|
|             Burundi|
|             Nigeria|
+--------------------+
only showing top 20 rows



## Grouping Data.
GRouping can help us understand the various apects of the data and extract insights.

In [16]:
sales.groupBy('Region').count().show()

+--------------------+------+
|              Region| count|
+--------------------+------+
|Middle East and N...|186391|
|Australia and Oce...|121405|
|              Europe|389079|
|  Sub-Saharan Africa|389607|
|Central America a...|162108|
|       North America| 32528|
|                Asia|218882|
+--------------------+------+



Sorting the above data in a particular order.

In [17]:
sales.groupBy('Region').count().orderBy('count', ascending=False).show()

+--------------------+------+
|              Region| count|
+--------------------+------+
|  Sub-Saharan Africa|389607|
|              Europe|389079|
|                Asia|218882|
|Middle East and N...|186391|
|Central America a...|162108|
|Australia and Oce...|121405|
|       North America| 32528|
+--------------------+------+



In [18]:
# Calculating the statictical methods using Group by method
sales.groupBy('Region').mean().show(5,False)

+---------------------------------+-------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|Region                           |avg(Order ID)      |avg(Units Sold)   |avg(Unit Price)   |avg(Unit Cost)    |avg(Total Revenue)|avg(Total Cost)  |avg(Total Profit) |
+---------------------------------+-------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|Middle East and North Africa     |5.500766295723345E8|4998.727250779276 |266.49557591298105|187.934668090201  |1330032.1388415764|937878.8473834554|392153.29145811684|
|Australia and Oceania            |5.501626799835098E8|5003.66386063177  |265.81827264118755|187.33434479634366|1330484.4323763414|937830.4436155829|392653.9887607592 |
|Europe                           |5.499526623995693E8|5002.550484605954 |265.9982680122825 |187.52497300025738|1330109.893156062 |937733.0395847369|392376

In [19]:
Africa.groupBy('country').sum().show(5)

+--------+-------------+---------------+------------------+------------------+--------------------+-------------------+--------------------+
| country|sum(Order ID)|sum(Units Sold)|   sum(Unit Price)|    sum(Unit Cost)|  sum(Total Revenue)|    sum(Total Cost)|   sum(Total Profit)|
+--------+-------------+---------------+------------------+------------------+--------------------+-------------------+--------------------+
|    Chad|4478281870537|       40971302| 2186699.500000046|1540827.2899999935|1.102570514302000...|7.771325528890009E9| 3.254379614129998E9|
| Senegal|4515355704559|       41032763| 2140684.300000047|1501231.1999999923|1.084949844780000...|7.612428578419999E9| 3.237069869379999E9|
| Eritrea|4392379601850|       40109911|2138600.5000000484|1506253.7499999923|1.063781470289000...|7.489852930409997E9| 3.147961772479999E9|
|Djibouti|4483980942309|       40227173| 2158737.920000045|1521159.5699999928|1.073054205282001...|7.568167534470009E9|3.1623745183499985E9|
|  Malawi|449

In [20]:
# Viewing the max/min of each colum according to region.
sales.groupBy('Region').max().show(5,False)

+---------------------------------+-------------+---------------+---------------+--------------+------------------+---------------+-----------------+
|Region                           |max(Order ID)|max(Units Sold)|max(Unit Price)|max(Unit Cost)|max(Total Revenue)|max(Total Cost)|max(Total Profit)|
+---------------------------------+-------------+---------------+---------------+--------------+------------------+---------------+-----------------+
|Middle East and North Africa     |999999892    |10000          |668.27         |524.96        |6682700.0         |5249075.04     |1738700.0        |
|Australia and Oceania            |999988305    |10000          |668.27         |524.96        |6679358.65        |5246975.2      |1737830.65       |
|Europe                           |999998605    |10000          |668.27         |524.96        |6682700.0         |5249600.0      |1738700.0        |
|Sub-Saharan Africa               |999996888    |10000          |668.27         |524.96        |6682

## USer Defined Functions (UDFs)
There are two types of UDFs available in PySpark:
    Conventional UDF and Pandas UDF. Pandas UDF are much more powerful in terms of speed and processing time.

In [21]:
from pyspark.sql.functions import udf # Importing udf  from pyspark functions.

### Defining UDF using lambda

In [22]:
Africa_udf = udf(lambda  Region: "African Country" if Region == "Sub-Saharan Africa" else "Not African Country", StringType())
sales.withColumn("African Country Only", Africa_udf(sales.Region)).select("Region","Country","African Country Only").show(10)

+--------------------+----------------+--------------------+
|              Region|         Country|African Country Only|
+--------------------+----------------+--------------------+
|  Sub-Saharan Africa|    South Africa|     African Country|
|Middle East and N...|         Morocco| Not African Country|
|Australia and Oce...|Papua New Guinea| Not African Country|
|  Sub-Saharan Africa|        Djibouti|     African Country|
|              Europe|        Slovakia| Not African Country|
|                Asia|       Sri Lanka| Not African Country|
|  Sub-Saharan Africa|     Seychelles |     African Country|
|  Sub-Saharan Africa|        Tanzania|     African Country|
|  Sub-Saharan Africa|           Ghana|     African Country|
|  Sub-Saharan Africa|        Tanzania|     African Country|
+--------------------+----------------+--------------------+
only showing top 10 rows



#### From the above code we see that African countries that are in north Africa are not classified as Afircan countries.

### Pandas UDFs
    NOte that Pandas UDFs are the most efficient in terms of speed and processing time.

In [23]:
# Pandas_udf having issues in spark 2.4
#from pyspark.sql.functions import col, pandas_udf

## Droping Duplicate Values

In [24]:
sales.count() # this counts the number of entries in the record

1500000

In [25]:
sales.dropDuplicates().count()  # the total number of entries after removing duplications.

1391396

## Deleting a column

In [26]:
sales.drop("Region", "Item Type", "Order Priority", "Order ID", "Units Sold").show()

+----------------+-------------+----------+----------+----------+---------+-------------+----------+------------+
|         Country|Sales Channel|Order Date| Ship Date|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+----------------+-------------+----------+----------+----------+---------+-------------+----------+------------+
|    South Africa|      Offline| 7/27/2012| 7/28/2012|      9.33|     6.92|     14862.69|  11023.56|     3839.13|
|         Morocco|       Online| 9/14/2013|10/19/2013|    109.28|    35.84|    503890.08| 165258.24|   338631.84|
|Papua New Guinea|      Offline| 5/15/2015|  6/4/2015|    421.89|   364.69|     151880.4|  131288.4|     20592.0|
|        Djibouti|      Offline| 5/17/2017|  7/2/2017|    109.28|    35.84|     61415.36|  20142.08|    41273.28|
|        Slovakia|      Offline|10/26/2016| 12/4/2016|     47.45|    31.79|    188518.85| 126301.67|    62217.18|
|       Sri Lanka|       Online| 11/7/2011|12/18/2011|      9.33|     6.92|     12866.07

## Writing data to file
### CSV
To save the data into csv format we use the coalesce function in spark:

In [27]:
#pwd # prints the working directory.
write_uri = "./data/new/"
df.coalesce(1).write.format("csv").option("header","true").save(write_uri)

### Parquet
If the dataset is huge and involves a lot of columns, we can choose to compress it and convert it into a parquet file format. It reduces the overall size of the data and optimizes the performance while processing data because it works on subsets of required columns instead of the entire data. We can convert and save the dataframe into the parquet format easily by mentioning the format as parquet:

In [28]:
Salesdropped = sales.drop("Item Type", "Order Priority", "Order ID", "Units Sold","Sales Channel","Order Date","Ship Date","Unit Price","Unit Cost","Total Revenue","Total Cost","Total Profit")
parquet_uri = "./data/parquet/"
Salesdropped.write.format('parquet').save(parquet_uri)