In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

In [2]:
spark = (SparkSession.builder.appName("TotalOrdersPerRegionCountry").getOrCreate())

22/02/15 14:35:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
type(spark)

pyspark.sql.session.SparkSession

In [4]:
sales_file = "data/sales_records.csv"

# sales_df is a dataframe. A dataframe is a distributed collection of data organized into rows and columns.
# When we are loading data into the dataframe we also need to infer a schema. 
# A schema is a structure that describes how the data is organised. 
# In csv file the very first line is header info which is the schema.
sales_df = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(sales_file))

sales_df.select("Region", "Country", "Order ID").show(n=10, truncate=False)

                                                                                

+---------------------------------+---------------------+---------+
|Region                           |Country              |Order ID |
+---------------------------------+---------------------+---------+
|Middle East and North Africa     |Azerbaijan           |535113847|
|Central America and the Caribbean|Panama               |874708545|
|Sub-Saharan Africa               |Sao Tome and Principe|854349935|
|Sub-Saharan Africa               |Sao Tome and Principe|892836844|
|Central America and the Caribbean|Belize               |129280602|
|Europe                           |Denmark              |473105037|
|Europe                           |Germany              |754046475|
|Middle East and North Africa     |Turkey               |772153747|
|Europe                           |United Kingdom       |847788178|
|Asia                             |Kazakhstan           |471623599|
+---------------------------------+---------------------+---------+
only showing top 10 rows



In [5]:
type(sales_df)

pyspark.sql.dataframe.DataFrame

In [6]:
# Selecting Region, Country and Order ID from our sales data frame.
# Group the data by Region and Country.
# Aggregate count function to count number of order id's.

count_sales_df = (sales_df.select("Region", "Country", "Order ID")
                  .groupBy("Region", "Country").agg(count("Order ID").alias("Total Orders"))
                  .orderBy("Total Orders", ascending=False))

count_sales_df.show(n=10, truncate=False)

#Printing total rows in csv file:
print("Total Rows = ", (count_sales_df.count()))

                                                                                

+---------------------------------+------------+------------+
|Region                           |Country     |Total Orders|
+---------------------------------+------------+------------+
|Sub-Saharan Africa               |Sudan       |623         |
|Australia and Oceania            |New Zealand |593         |
|Europe                           |Vatican City|590         |
|Europe                           |Malta       |589         |
|Sub-Saharan Africa               |Mozambique  |589         |
|Middle East and North Africa     |Tunisia     |584         |
|Asia                             |Cambodia    |584         |
|Central America and the Caribbean|Panama      |578         |
|Sub-Saharan Africa               |Rwanda      |576         |
|Sub-Saharan Africa               |South Africa|575         |
+---------------------------------+------------+------------+
only showing top 10 rows





Total Rows =  185


                                                                                

In [7]:
# A narrow transformation (filter method) is where output can be computed from a single input partition.
# Map, FlatMap, MapPartition, Filter, Sample & Union methods.
# Data is stored and distributed into partitions. Single input refers to each partition.
# The calculation of data operations can be carried out without the partitions having to exchange or shuffle data.

In [8]:
# Wide Transformations require input from other partitions, data shuffling is needed before processing.
# Intersection, Distinct, ReduceByKey, GroupByKey, Join, Cartesian, Repartition & Coalesce methods.
# Spark executors on worker nodes will shuffle data. Data is shuffled and arranged in a new partition.

In [10]:
# Transformations: orderBy(), groupBy(), filter(), select(), join()
# Actions: show(), take(), count(), collect(), save()
# Transformations always results in a new data frame. Actions don't result in a new dataframe/RDD.

In [11]:
# Spark has lazy evaluation. Lazy evaluation means the execution will not start until an action is triggered.
# Transformations are lazy in nature i.e. when we call some operation on an RDD, it does not execute immediately.