<h4>Resilient Distributed Dataset (RDD)</h4>
<ul>
    <li>Fault tolerant</li>
    <li>Resilient</li>
    <li>Immutable</li>
    <li>Partitioned</li>
</ul>

<h4>Data Upload</h4>

In [None]:
# Upload list from a file
rdd_2 = spark.sparkContext.textFile("menu.txt")

In [None]:
# Create a Dataframe
df_data = [("Margherita", 5.95, ["Tomato Sauce", "Mozzarella Cheese", "Basil"]),
        ("Calzone", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"]),
        ("Diavola", 5.95, ["Tomato Sauce", "Mozzarella Cheese", "Spicy Salame"]),
        ("Prosciutto", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Prosciutto Cotto"]),
        ("Speck & Brie", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Speck", "Brie"]),
        ("Tonno & Cipolle", 7.95, ["Tomato Sauce", "Mozzarella Cheese", "Tuna", "Onions"]),
        ("Fries", 3.95, ["Potatoes"])]
        
columns = ["Pizza Name", "Price", "Ingredients"]
df = spark.createDataFrame(data = df_data, schema = columns)

In [None]:
# Show the first 20 elements of a dataframe
df.show()

<h4>Dataframes from RDDs</h4>

In [None]:
df_2_from_rdd = spark.createDataFrame(rdd).toDF(*columns)
df_from_rdd.printSchema()

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType

#Createe the schema using StructField(Name, Type, Nullable)
schema = StructType([ \
    StructField("Pizza Name", StringType(), True), \
    StructField("Price", FloatType(), True), \
    StructField("Ingredients", ArrayType(StringType()), True) \
])
 
df = spark.createDataFrame(data = df_data, schema = schema)
df.printSchema()
df.show(truncate=False)

<h4>Organizing Data</h4>

In [None]:
from pyspark.sql.functions import col
# Sorting depending on the fields
df.sort(col("Price"), col("Pizza Name")).show(truncate = False)

In [None]:
# Expliciting the sorting (work the same with orderBy)
df.sort(col("Price").asc(), col("Pizza Name").desc()).show(truncate = False)

In [None]:
# We could also use raw SQL
# No spoilers -> We'll see how to use it later on

<h4>Explode Arrays in Individual Rows</h4>

In [None]:
from pyspark.sql.functions import explode

exploded_df = df.select(col("Pizza Name"), df.Price, explode(df.Ingredients))
exploded_df.printSchema()
exploded_df.show(truncate = False)

In [None]:
# How can we rename a column?
exploded_df = exploded_df.withColumnRenamed("col", "Ingredient").printSchema()