<a href="https://colab.research.google.com/github/drshahizan/Python-big-data/blob/main/Assignment%202a/SamVerse/SamVerse_File_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark DataFrame Basics

Spark DataFrame allow for handling large dataset easier.



*   Easy Syntax
*   Ability to use SQL directly in the frame
*   Operations are distributed across Resilient Distributed Dataset (RDD)






## Create a DataFrame



Start by installing the PySpark package by inserting the following code:

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.sql import SparkSession

In [None]:
import pyspark.sql.functions as func

In [None]:
spark = SparkSession.builder.appName("pyspark_basics").getOrCreate()

In [None]:
path = "/content/drive/MyDrive/Colab Notebooks/1000000 Sales Records.csv"

In [None]:
df = spark.read.csv(path, header=True)

## Show DataFrame

Displaying data from the dataset. Unspecified range of data retrieval as shown below will display only the first 20 results.

In [None]:
df.show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|    South Africa|         Fruits|      Offline|             M| 7/27/2012|443368995| 7/28/2012|      1593|      9.33|     6.92|     14862.69|  11023.56|     3839.13|
|Middle East and N...|         Morocco|        Clothes|       Online|             M| 9/14/2013|667593514|10/19/2013|      4611|    109.28|    35.84|    503890.08| 165258.24|   338631.84|
|Australia and Oce...|Papua New Guinea|           Meat|      Offl

Explore all the variables contained in the dataset along with its data types using printSchema()

In [None]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: string (nullable = true)
 |-- Unit Price: string (nullable = true)
 |-- Unit Cost: string (nullable = true)
 |-- Total Revenue: string (nullable = true)
 |-- Total Cost: string (nullable = true)
 |-- Total Profit: string (nullable = true)



## Specify Schema Structure


*   Some data types make it easier to infer schema
*   Spark has tools to help specify the structure



In [None]:
from pyspark.sql.types import StructField, IntegerType, StructType, FloatType

In [None]:
df = df.withColumn('Order ID', df['Order ID'].cast(IntegerType()))
df = df.withColumn('Units Sold', df['Units Sold'].cast(IntegerType()))
df = df.withColumn('Unit Price', df['Unit Price'].cast(FloatType()))
df = df.withColumn('Unit Cost', df['Unit Cost'].cast(FloatType()))
df = df.withColumn('Total Revenue', df['Total Revenue'].cast(FloatType()))
df = df.withColumn('Total Cost', df['Total Cost'].cast(FloatType()))
df = df.withColumn('Total Profit', df['Total Profit'].cast(FloatType()))

In [None]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: float (nullable = true)
 |-- Unit Cost: float (nullable = true)
 |-- Total Revenue: float (nullable = true)
 |-- Total Cost: float (nullable = true)
 |-- Total Profit: float (nullable = true)



## Grab Data

Unlike Pandas, PySpark has a different method to retrieve datas from the dataset.

In [None]:
df['Region']

Column<'Region'>

The method that's used in Pandas to retrieve data will only return as a column instead of the actual data. 

In [None]:
type(df['Region'])

pyspark.sql.column.Column

In [None]:
df.select('Region')

DataFrame[Region: string]

In [None]:
type(df.select('Region'))

pyspark.sql.dataframe.DataFrame

PySpark requires an additional step which is to include show() to actually retrieve any particular data.

In [None]:
df.select('Region').show(10)

+--------------------+
|              Region|
+--------------------+
|  Sub-Saharan Africa|
|Middle East and N...|
|Australia and Oce...|
|  Sub-Saharan Africa|
|              Europe|
|                Asia|
|  Sub-Saharan Africa|
|  Sub-Saharan Africa|
|  Sub-Saharan Africa|
|  Sub-Saharan Africa|
+--------------------+
only showing top 10 rows



In [None]:
df.select('Country','Sales Channel').show(10)

+----------------+-------------+
|         Country|Sales Channel|
+----------------+-------------+
|    South Africa|      Offline|
|         Morocco|       Online|
|Papua New Guinea|      Offline|
|        Djibouti|      Offline|
|        Slovakia|      Offline|
|       Sri Lanka|       Online|
|     Seychelles |       Online|
|        Tanzania|       Online|
|           Ghana|       Online|
|        Tanzania|      Offline|
+----------------+-------------+
only showing top 10 rows



In [None]:
df.head(2)

[Row(Region='Sub-Saharan Africa', Country='South Africa', Item Type='Fruits', Sales Channel='Offline', Order Priority='M', Order Date='7/27/2012', Order ID=443368995, Ship Date='7/28/2012', Units Sold=1593, Unit Price=9.329999923706055, Unit Cost=6.920000076293945, Total Revenue=14862.6904296875, Total Cost=11023.5595703125, Total Profit=3839.1298828125),
 Row(Region='Middle East and North Africa', Country='Morocco', Item Type='Clothes', Sales Channel='Online', Order Priority='M', Order Date='9/14/2013', Order ID=667593514, Ship Date='10/19/2013', Units Sold=4611, Unit Price=109.27999877929688, Unit Cost=35.84000015258789, Total Revenue=503890.09375, Total Cost=165258.234375, Total Profit=338631.84375)]

In [None]:
df.limit(10).show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|         Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|    South Africa|         Fruits|      Offline|             M| 7/27/2012|443368995| 7/28/2012|      1593|      9.33|     6.92|     14862.69|  11023.56|     3839.13|
|Middle East and N...|         Morocco|        Clothes|       Online|             M| 9/14/2013|667593514|10/19/2013|      4611|    109.28|    35.84|     503890.1| 165258.23|   338631.84|
|Australia and Oce...|Papua New Guinea|           Meat|      Offl

In [None]:
# To get rid or whitespaces
from pyspark.sql import DataFrame
def fix_header(df: DataFrame) -> list:
    fixed_col_list: list = []
    for col in df.columns:
        fixed_col_list.append(f"`{str(col).strip()}` as {str(col).strip().replace(' ','_')}")
    return fixed_col_list

# Create a new dataframe with fixed column names
fixed_headers = fix_header(df)


# Apply to create the new dataframe
fixed_df = df.selectExpr(fixed_headers)
fixed_df.printSchema()
df = fixed_df
df.show(10)

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Sales_Channel: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Order_Date: string (nullable = true)
 |-- Order_ID: integer (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: float (nullable = true)
 |-- Unit_Cost: float (nullable = true)
 |-- Total_Revenue: float (nullable = true)
 |-- Total_Cost: float (nullable = true)
 |-- Total_Profit: float (nullable = true)

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|         Country|      Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+-------------

## Create New Column

You can create a new column in the dataset by inserting the code below. However this will not make a permanent change to the orginal dataset.

In [None]:
df.withColumn('Profit per Unit', func.round(df['Unit_Price']-df['Unit_Cost'],2)).limit(10).show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+---------------+
|              Region|         Country|      Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|Profit per Unit|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+---------------+
|  Sub-Saharan Africa|    South Africa|         Fruits|      Offline|             M| 7/27/2012|443368995| 7/28/2012|      1593|      9.33|     6.92|     14862.69|  11023.56|     3839.13|           2.41|
|Middle East and N...|         Morocco|        Clothes|       Online|             M| 9/14/2013|667593514|10/19/2013|      4611|    109.28|    35.84|     503890.1| 165258.23|   338631.84|  

Rename a column name using withColumnRenamed() by inserting the column to be changed followed by the name to changed to.

In [None]:
df.withColumnRenamed('Country','Nation').limit(10).show()

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|          Nation|      Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|    South Africa|         Fruits|      Offline|             M| 7/27/2012|443368995| 7/28/2012|      1593|      9.33|     6.92|     14862.69|  11023.56|     3839.13|
|Middle East and N...|         Morocco|        Clothes|       Online|             M| 9/14/2013|667593514|10/19/2013|      4611|    109.28|    35.84|     503890.1| 165258.23|   338631.84|
|Australia and Oce...|Papua New Guinea|           Meat|      Offl

## Using SQL

PySpark can also be used with similar method syntaxes from SQL.

In [None]:
df.createOrReplaceTempView('The_Dataset')

In [None]:
sql_results = spark.sql('SELECT * from The_Dataset')

In [None]:
sql_results

DataFrame[Region: string, Country: string, Item_Type: string, Sales_Channel: string, Order_Priority: string, Order_Date: string, Order_ID: int, Ship_Date: string, Units_Sold: int, Unit_Price: float, Unit_Cost: float, Total_Revenue: float, Total_Cost: float, Total_Profit: float]

In [None]:
sql_results.show(10)

+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|         Country|      Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+----------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Sub-Saharan Africa|    South Africa|         Fruits|      Offline|             M| 7/27/2012|443368995| 7/28/2012|      1593|      9.33|     6.92|     14862.69|  11023.56|     3839.13|
|Middle East and N...|         Morocco|        Clothes|       Online|             M| 9/14/2013|667593514|10/19/2013|      4611|    109.28|    35.84|     503890.1| 165258.23|   338631.84|
|Australia and Oce...|Papua New Guinea|           Meat|      Offl

In [None]:
spark.sql('SELECT * FROM The_Dataset WHERE Region = "Asia"').show(10)

+------+------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Region|     Country|      Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+------+------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|  Asia|   Sri Lanka|         Fruits|       Online|             L| 11/7/2011|830192887|12/18/2011|      1379|      9.33|     6.92|     12866.07|   9542.68|     3323.39|
|  Asia|      Taiwan|         Fruits|      Offline|             M|  2/9/2014|732588374| 2/23/2014|      8034|      9.33|     6.92|     74957.22|  55595.28|    19361.94|
|  Asia|   Singapore|         Snacks|       Online|             C| 1/28/2013|176461303|  2/7/2013|      7676|    152.58|    97.44|    1171204.1| 747949.44|

In [None]:
spark.sql('SELECT Item_Type, Units_Sold FROM The_Dataset').show(10)

+---------------+----------+
|      Item_Type|Units_Sold|
+---------------+----------+
|         Fruits|      1593|
|        Clothes|      4611|
|           Meat|       360|
|        Clothes|       562|
|      Beverages|      3973|
|         Fruits|      1379|
|      Beverages|       597|
|      Beverages|      1476|
|Office Supplies|       896|
|      Cosmetics|      7768|
+---------------+----------+
only showing top 10 rows

