In [1]:
# Import the basic spark library
from pyspark.sql import SparkSession

# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("spark_tutorial") \
      .getOrCreate()
# master contains the URL of your remote spark instance or 'local'

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/06 14:35:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<h4>Resilient Distributed Dataset (RDD)</h4>
<ul>
    <li>Fault tolerant</li>
    <li>Resilient</li>
    <li>Immutable</li>
    <li>Partitioned</li>
</ul>

<h4>Data Upload</h4>

In [2]:
# Upload data from a list  
data = [("Margherita", 5.95, ["Tomato Sauce", "Mozzarella Cheese", "Basil"]),
        ("Calzone", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"])]

# Create an RDD
rdd = spark.sparkContext.parallelize(data)

In [3]:
# Upload list from a file
rdd_2 = spark.sparkContext.textFile("menu.txt")

<h4>Dataframe Creation</h4>

In [4]:
# Create a Dataframe
df_data = [
        ("Margherita", 5.95, ["Tomato Sauce", "Mozzarella Cheese", "Basil"]),
        ("Calzone", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"]),
        ("Diavola", 5.95, ["Tomato Sauce", "Mozzarella Cheese", "Spicy Salame"]),
        ("Prosciutto", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"]),
        ("Speck & Brie", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Speck", "Brie"]),
        ("Tonno & Cipolle", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Tuna", "Onions"]),
        ("Fries", 3.95, ["Potatoes"])
        ]
        
columns = ["Pizza Name", "Price", "Ingredients"]
df = spark.createDataFrame(data = df_data, schema = columns)

In [6]:
# Show the first 20 elements of a dataframe
df.show(truncate = False)

+---------------+-----+---------------------------------------------------+
|Pizza Name     |Price|Ingredients                                        |
+---------------+-----+---------------------------------------------------+
|Margherita     |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Calzone        |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Diavola        |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Prosciutto     |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Speck & Brie   |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
|Fries          |3.95 |[Potatoes]                                         |
+---------------+-----+---------------------------------------------------+



In [8]:
# Load a DataFrame
df = spark.read.option("header", True).option("delimiter", ";").csv("menu_csv.txt")

# Print detected 
df.printSchema()

df.show(truncate = False)

root
 |-- Pizza Name: string (nullable = true)
 |--  Price: string (nullable = true)
 |--  Ingredients: string (nullable = true)

+---------------+------+----------------------------------------------------------+
|Pizza Name     | Price| Ingredients                                              |
+---------------+------+----------------------------------------------------------+
|Margherita     | 5.95 | ["Tomato Sauce", "Mozzarella Cheese", "Basil"]           |
|Calzone        | 7.95 | ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"]|
|Diavola        | 5.95 | ["Tomato Sauce", "Mozzarella Cheese", "Spicy Salame"]    |
|Prosciutto     | 7.95 | ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"]|
|Speck & Brie   | 7.95 | ["Tomato Sauce", "Mozzarella Cheese", "Speck", "Brie"]   |
|Tonno & Cipolle| 7.95 | ["Tomato Sauce", "Mozzarella Cheese", "Tuna", "Onions"]  |
+---------------+------+----------------------------------------------------------+



<h4>Dataframes from RDDs</h4>

In [10]:
# Transform the RDD into a Dataframe
df_from_rdd = rdd.toDF()

# Print the schema of the Dataframe
df_from_rdd.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: double (nullable = true)
 |-- _3: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [11]:
#Transform the RDD into a Dataframe, specifying the columns
columns = ["Pizza Name", "Price", "Ingredients"]
df_from_rdd = rdd.toDF(columns)
df_from_rdd.printSchema()

root
 |-- Pizza Name: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Ingredients: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [12]:
df_2_from_rdd = spark.createDataFrame(rdd).toDF(*columns)
df_from_rdd.printSchema()

root
 |-- Pizza Name: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Ingredients: array (nullable = true)
 |    |-- element: string (containsNull = true)



<h4>Custom Dataframe</h4>

In [13]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType

#Createe the schema using StructField(Name, Type, Nullable)
schema = StructType([ \
    StructField("Pizza Name", StringType(), True), \
    StructField("Price", FloatType(), True), \
    StructField("Ingredients", ArrayType(StringType()), True) \
])
 
df = spark.createDataFrame(data = df_data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- Pizza Name: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- Ingredients: array (nullable = true)
 |    |-- element: string (containsNull = true)

+---------------+-----+---------------------------------------------------+
|Pizza Name     |Price|Ingredients                                        |
+---------------+-----+---------------------------------------------------+
|Margherita     |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Calzone        |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Diavola        |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Prosciutto     |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Speck & Brie   |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
|Fries          |3.95 |[Potatoes]                                         |
+---------------+-----+-------------------------------------------

<h4>Organizing Data</h4>

In [46]:
# Sorting depending on the fields (default = ascending order)
df.sort("Price").show(truncate = False)

+---------------+-----+---------------------------------------------------+
|Pizza Name     |Price|Ingredients                                        |
+---------------+-----+---------------------------------------------------+
|Fries          |3.95 |[Potatoes]                                         |
|Margherita     |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Diavola        |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Calzone        |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Prosciutto     |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Speck & Brie   |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
+---------------+-----+---------------------------------------------------+



In [14]:
from pyspark.sql.functions import col

# Sorting depending on the fields
df.sort(col("Price"), col("Pizza Name")).show(truncate = False)

+---------------+-----+---------------------------------------------------+
|Pizza Name     |Price|Ingredients                                        |
+---------------+-----+---------------------------------------------------+
|Fries          |3.95 |[Potatoes]                                         |
|Diavola        |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Margherita     |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Calzone        |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Prosciutto     |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Speck & Brie   |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
+---------------+-----+---------------------------------------------------+



In [15]:
# Sorting using orderBy
df.orderBy(col("Price"), col("Pizza Name")).show(truncate = False)

+---------------+-----+---------------------------------------------------+
|Pizza Name     |Price|Ingredients                                        |
+---------------+-----+---------------------------------------------------+
|Fries          |3.95 |[Potatoes]                                         |
|Diavola        |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Margherita     |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Calzone        |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Prosciutto     |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Speck & Brie   |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
+---------------+-----+---------------------------------------------------+



In [16]:
# Expliciting the sorting (work the same with orderBy)
df.sort(col("Price").asc(), col("Pizza Name").desc()).show(truncate = False)

+---------------+-----+---------------------------------------------------+
|Pizza Name     |Price|Ingredients                                        |
+---------------+-----+---------------------------------------------------+
|Fries          |3.95 |[Potatoes]                                         |
|Margherita     |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil]           |
|Diavola        |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame]    |
|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions]    |
|Speck & Brie   |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie]     |
|Prosciutto     |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
|Calzone        |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|
+---------------+-----+---------------------------------------------------+



<h4>Explode Arrays in Individual Rows</h4>

In [17]:
from pyspark.sql.functions import explode

exploded_df = df.select(col("Pizza Name"), df.Price, explode(df.Ingredients))
exploded_df.printSchema()
exploded_df.show(truncate = False)

root
 |-- Pizza Name: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- col: string (nullable = true)

+---------------+-----+-----------------+
|Pizza Name     |Price|col              |
+---------------+-----+-----------------+
|Margherita     |5.95 |Tomato Sauce     |
|Margherita     |5.95 |Mozzarella Cheese|
|Margherita     |5.95 |Basil            |
|Calzone        |7.95 |Tomato Sauce     |
|Calzone        |7.95 |Mozzarella Cheese|
|Calzone        |7.95 |Prosciutto Cotto |
|Diavola        |5.95 |Tomato Sauce     |
|Diavola        |5.95 |Mozzarella Cheese|
|Diavola        |5.95 |Spicy Salame     |
|Prosciutto     |7.95 |Tomato Sauce     |
|Prosciutto     |7.95 |Mozzarella Cheese|
|Prosciutto     |7.95 |Prosciutto Cotto |
|Speck & Brie   |7.95 |Tomato Sauce     |
|Speck & Brie   |7.95 |Mozzarella Cheese|
|Speck & Brie   |7.95 |Speck            |
|Speck & Brie   |7.95 |Brie             |
|Tonno & Cipolle|7.95 |Tomato Sauce     |
|Tonno & Cipolle|7.95 |Mozzarella Cheese|

In [18]:
# How can we rename a column?
exploded_df = exploded_df.withColumnRenamed("col", "Ingredient")

exploded_df.printSchema()

root
 |-- Pizza Name: string (nullable = true)
 |-- Price: float (nullable = true)
 |-- Ingredient: string (nullable = true)

