In [None]:
# install the dependencies
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop3.2.tgz
!tar xf spark-3.1.3-bin-hadoop3.2.tgz
!pip -q install findspark

In [None]:
#Setting up variables and finding Spark with findspark module
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.7.0 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'
findspark.init()

In [None]:
#Creating SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *

spark = SparkSession.builder.appName('Spark Training').getOrCreate()

In [None]:
path = '/content/bakery_sales.csv'

'''
Dataframe could also be created using read.format(), read.csv,json, etc, createDataFrame or using schema.

Ex:
df_bakery = (spark.read.option("inferSchema",True) \
                      .option("header", True) \
                      .csv(path)
)

or

df_bakery = spark.read.table("<catalog_name>.<schema_name>.<table_name>")

etc.

'''
df_bakery = (
              spark.read.format("csv") \
                  .option("inferSchema", True) \
                  .option("delimiter", ",") \
                  .option("header", True) \
                  .load(path)
)


In [None]:
#Show schema
df_bakery.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- ticket_number: double (nullable = true)
 |-- article: string (nullable = true)
 |-- Quantity: double (nullable = true)
 |-- unit_price: string (nullable = true)



In [None]:
#Show dataframe
#display(df_bakery)
df_bakery.show()

+---+----------+-----+-------------+--------------------+--------+----------+
|_c0|      date| time|ticket_number|             article|Quantity|unit_price|
+---+----------+-----+-------------+--------------------+--------+----------+
|  0|2021-01-02|08:38|     150040.0|            BAGUETTE|     1.0|    0,90 €|
|  1|2021-01-02|08:38|     150040.0|    PAIN AU CHOCOLAT|     3.0|    1,20 €|
|  4|2021-01-02|09:14|     150041.0|    PAIN AU CHOCOLAT|     2.0|    1,20 €|
|  5|2021-01-02|09:14|     150041.0|                PAIN|     1.0|    1,15 €|
|  8|2021-01-02|09:25|     150042.0|TRADITIONAL BAGUETTE|     5.0|    1,20 €|
| 11|2021-01-02|09:25|     150043.0|            BAGUETTE|     2.0|    0,90 €|
| 12|2021-01-02|09:25|     150043.0|           CROISSANT|     3.0|    1,10 €|
| 15|2021-01-02|09:27|     150044.0|             BANETTE|     1.0|    1,05 €|
| 18|2021-01-02|09:32|     150045.0|TRADITIONAL BAGUETTE|     3.0|    1,20 €|
| 19|2021-01-02|09:32|     150045.0|           CROISSANT|     6.

In [None]:
#Describe dataframe basic statistics 
df_bakery.describe()

DataFrame[summary: string, _c0: string, date: string, time: string, ticket_number: string, article: string, Quantity: string, unit_price: string]

In [None]:
#Show the first x dataframe rows
df_bakery.head(5)

[Row(_c0=0, date='2021-01-02', time='08:38', ticket_number=150040.0, article='BAGUETTE', Quantity=1.0, unit_price='0,90 €'),
 Row(_c0=1, date='2021-01-02', time='08:38', ticket_number=150040.0, article='PAIN AU CHOCOLAT', Quantity=3.0, unit_price='1,20 €'),
 Row(_c0=4, date='2021-01-02', time='09:14', ticket_number=150041.0, article='PAIN AU CHOCOLAT', Quantity=2.0, unit_price='1,20 €'),
 Row(_c0=5, date='2021-01-02', time='09:14', ticket_number=150041.0, article='PAIN', Quantity=1.0, unit_price='1,15 €'),
 Row(_c0=8, date='2021-01-02', time='09:25', ticket_number=150042.0, article='TRADITIONAL BAGUETTE', Quantity=5.0, unit_price='1,20 €')]

In [None]:
#Count number of rows
df_bakery.count()

50793

In [None]:
#Selecting columns
#df_bakery.select(col('date'), col('time')).show()
df_bakery.select('date', 'time').show()

+----------+-----+
|      date| time|
+----------+-----+
|2021-01-02|08:38|
|2021-01-02|08:38|
|2021-01-02|09:14|
|2021-01-02|09:14|
|2021-01-02|09:25|
|2021-01-02|09:25|
|2021-01-02|09:25|
|2021-01-02|09:27|
|2021-01-02|09:32|
|2021-01-02|09:32|
|2021-01-02|09:37|
|2021-01-02|09:37|
|2021-01-02|09:37|
|2021-01-02|09:39|
|2021-01-02|09:40|
|2021-01-02|09:40|
|2021-01-02|09:41|
|2021-01-02|09:46|
|2021-01-02|09:48|
|2021-01-02|09:48|
+----------+-----+
only showing top 20 rows



In [None]:
#Taking distinct values from column - first 5
df_bakery.select('ticket_number').distinct().head(5)

[Row(ticket_number=150828.0),
 Row(ticket_number=150859.0),
 Row(ticket_number=150867.0),
 Row(ticket_number=150903.0),
 Row(ticket_number=150963.0)]

In [None]:
#Counting distinct values from column
df_bakery.select('ticket_number').distinct().count()

29565

In [None]:
#Filtering values by condition using filter
baguette_tickets = df_bakery.filter(df_bakery.article == 'BAGUETTE').show()                                        

+---+----------+-----+-------------+--------+--------+----------+
|_c0|      date| time|ticket_number| article|Quantity|unit_price|
+---+----------+-----+-------------+--------+--------+----------+
|  0|2021-01-02|08:38|     150040.0|BAGUETTE|     1.0|    0,90 €|
| 11|2021-01-02|09:25|     150043.0|BAGUETTE|     2.0|    0,90 €|
|102|2021-01-02|10:24|     150066.0|BAGUETTE|     1.0|    0,90 €|
|155|2021-01-02|10:51|     150079.0|BAGUETTE|     1.0|    0,90 €|
|188|2021-01-02|10:58|     150087.0|BAGUETTE|     2.0|    0,90 €|
|194|2021-01-02|11:06|     150089.0|BAGUETTE|     1.0|    0,90 €|
|209|2021-01-02|11:14|     150092.0|BAGUETTE|     2.0|    0,90 €|
|254|2021-01-02|11:22|     150102.0|BAGUETTE|     1.0|    0,90 €|
|273|2021-01-02|11:29|     150107.0|BAGUETTE|     2.0|    0,90 €|
|294|2021-01-02|11:34|     150112.0|BAGUETTE|     2.0|    0,90 €|
|309|2021-01-02|11:45|     150116.0|BAGUETTE|     4.0|    0,90 €|
|313|2021-01-02|11:46|     150117.0|BAGUETTE|     1.0|    0,90 €|
|341|2021-

In [None]:
#Create subset filtering values by condition using filter and selecting columns
baguette_tickets = df_bakery.filter(df_bakery.article == 'BAGUETTE').select('ticket_number', 'article')
baguette_tickets.show()
                                        

+-------------+--------+
|ticket_number| article|
+-------------+--------+
|     150040.0|BAGUETTE|
|     150043.0|BAGUETTE|
|     150066.0|BAGUETTE|
|     150079.0|BAGUETTE|
|     150087.0|BAGUETTE|
|     150089.0|BAGUETTE|
|     150092.0|BAGUETTE|
|     150102.0|BAGUETTE|
|     150107.0|BAGUETTE|
|     150112.0|BAGUETTE|
|     150116.0|BAGUETTE|
|     150117.0|BAGUETTE|
|     150124.0|BAGUETTE|
|     150128.0|BAGUETTE|
|     150141.0|BAGUETTE|
|     150142.0|BAGUETTE|
|     150148.0|BAGUETTE|
|     150150.0|BAGUETTE|
|     150151.0|BAGUETTE|
|     150152.0|BAGUETTE|
+-------------+--------+
only showing top 20 rows

