# Working with Spark Dataframe


In [3]:

%%pyspark
df = spark.read.load(
    'Files/products_checked.csv',
    format = 'csv',
    header = True
)
# you can display the dataframe and limit it to any number of rows
display(df.limit(8))

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, f098a603-0a86-4eff-a529-e01415259020)

In [5]:
%%spark

val df = spark.read.format("csv").option("header", "true").load("Files/products_checked.csv")
display(df.limit(8))

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, 59702da4-a7b9-4ac9-8e56-c68066cf796f)


df: org.apache.spark.sql.DataFrame = [product_id: string, product_name: string ... 3 more fields]


# Specifying an explicit schema
- When you have a csv file without header, you can also explicitly specify the schemas.
- Specifying explicit schema improves performance

In [6]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("product_id", IntegerType()),
    StructField("product_name", StringType()),
    StructField("aisle_id", IntegerType()),
    StructField("department_id", IntegerType()),
    StructField("prices", FloatType())
    ])

df = spark.read.load('Files/products_NoHeader.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(5))

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, 19bca070-4757-4a6d-892d-06f7c75d6da6)

### Filtering and grouping dataframes

In [8]:
# Retrieve the price by the Product_id
product_pricelist = df.select("product_name", "prices")
display(product_pricelist)

# Select returns a new dataframe

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, 7fb5ebfc-6943-458b-893d-cb839a14878e)

Selecting a dataframe follows the normal python code

In [10]:
pricelist = df["Product_name", "prices"]
display(pricelist)

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, c78d352a-3094-4b64-ab42-87c2880bfb9a)

In [30]:
# List products name, departmentid and price where price is greater than 24 dollar or less than 1.5 dollar
product_above24 = df.select("product_name", "department_id", "prices").where((df["prices"]>24) | (df["prices"]<1.5) )
display(product_above24)

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 33, Finished, Available)

SynapseWidget(Synapse.DataFrame, e864eb04-8f2a-4238-9277-4f3804fb33b0)

#####  Group by can be used to aggregate data

In [31]:
# count the number of products for each department
Product_dept = df.select("Product_name", "department_id").groupBy("department_id").count()
display(Product_dept)

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 34, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8f0c12d8-8b7a-4f70-bc42-b865237f3e2a)

#####  Saving the Dataframe
The following code example saves the dataFrame into a parquet file in the data lake, replacing any existing file of the same name.

In [28]:
product_above24.write.mode("overwrite").parquet('Files/product_data/price24.parquet')

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 31, Finished, Available)

#### Partitioning the output file
Partitioning is an optimization technique that enables Spark to maximize performance across the worker nodes. More performance gains can be achieved when filtering data in queries by eliminating unnecessary disk IO.

PartitionBy method will be used here

In [32]:
product_above24.write.partitionBy("prices").mode("overwrite").parquet("Files/new_productlist")

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 35, Finished, Available)

In [None]:
#Partitioning data could take the below sample
# bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

In [None]:



#Loading partitioned data back to the dataframe
# road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
# display(road_bikes_df.limit(5))

# Using Spark SQL API to query data

In [36]:
df.createOrReplaceTempView("products_view")

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 39, Finished, Available)

In [37]:
df.write.format("delta").saveAsTable("products")

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 40, Finished, Available)

In [None]:
# bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
#                       FROM products \
#                       WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
# display(bikes_df)

In [41]:
# List products name, departmentid and price where department_id is 3 or 4
product_3_4 = spark.sql("SELECT product_name, department_id, prices \
                        FROM products_checked \
                        WHERE department_id IN (3,4) ")
display(product_3_4)

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 44, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0eb1b134-fbb5-4543-b347-7021d9c3c349)

# Using SQL Code

In [50]:
%%sql
-- # Include a double percent symbol to activate SQL Code

SELECT department_id, COUNT(product_id) AS ProductCount 
FROM products
WHERE department_id IS NOT NULL
GROUP BY department_id
ORDER BY COUNT(product_id) DESC;

StatementMeta(, 529ee762-a3de-49fc-8ba2-a4656c4d9f33, 53, Finished, Available)

<Spark SQL result set with 21 rows and 2 fields>