# Transform Data with Spark in Azure Synapse Analytics

Spark allows you to work with and manipulate data into information

In this notebook, you'll consolidate heterogenous sources and create a homogenous output for consumption into a partitioned parquet format. This file can be used for consumption by a data scientist or data analyst for further analysis.

> **Note**: This notebook is designed to be run in an Azure Synapse Analytics Spark pool.


## Attach this notebook to a Spark pool

To run the code in this notebook, you'll need to use a Spark pool; so at the top of this notebook, in the **Attach to** list, select your Spark pool.

## Explore Data

Before training a model, a data engineer will explore the data to ensure that its profile matches what is expected, which is usually in the form of a technical specification. The use of notebooks though, allows data professionals to place these specifications within the notebook itself and allows for much greater collaboration throughout the organization.

In this example, you'll explore some historical flight data with which we'll later transform into a denormalized structure and store it on disk for consumption by other data professionals downstream.


### Load Data Using an Explicit Schema

Let's start by loading some historical Sales Order data into a dataframe. If the structure of the data is known ahead of time, you can explicitly specify the schema for the dataframe.

Review the code in the cell below, which defines a schema for Sales Order data before loading it from all of the csv files within the data directoyr. Then click the **&#9655;** button to the left of the cell to run it.

> **Note**: The first time you run a cell in a notebook, the Spark pool must be started; which can take several minutes.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

OrderSchema = StructType([
  StructField("SalesOrderNumber", StringType(), False),
  StructField("SalesOrderLineNumber", IntegerType(), False),
  StructField("OrderDate", DateType(), False),
  StructField("CustomerName", StringType(), False),
  StructField("EmailAddress", StringType(), False),
  StructField("Item", StringType(), False),
  StructField("Quantity", IntegerType(), False),
  StructField("UnitPrice", StringType(), False),
  StructField("TaxAmount", StringType(), False)
])

#let's not query the entire dataset, instead let's look at the top 20 results
OrderDetails = spark.read.csv('/data/*.csv', schema=OrderSchema, header=True)
display(OrderDetails.limit(20))


In [None]:
display(OrderDetails.limit(20))

### Infer a Data Schema
If the structure of the data source is unknown, you can have Spark automatically infer the schema.

In this case, you will load data about airports without knowing the schema.

Run the following cell to load airport data from a text file, inferring the column names and data types automatically.

In [None]:
OrderDetails2019 = spark.read.csv('/data/*.csv', header=True, inferSchema=True)
#display(OrderDetails2019.limit(20))

### Count the Rows in a Dataframes
Now that you're familiar with working with dataframes, a key task when building predictive solutions is to explore the data, determing statistics that will help you understand the data before building predictive models. For example, how many rows of flight data do you actually have?

In [None]:
# This spans across all the files to perform a count within the dataframe
OrderDetails.count()

### Count inferred rows
This will execute the same query on inferred data from all of the files with a csv extension in the noted folder. The count should be the same as that above.

In [None]:
# This code spans the single file pulled into the dataframe
OrderDetails2019.count()

### Remove any headers from the consolidation of the source files

In [None]:
OrderDetails = OrderDetails.filter(OrderDetails.SalesOrderNumber == "SalesOrderNumber")
print(OrderDetails)

In [None]:
print('Total rows in dataframe where \
EmailAddress = morgan30@adventure-works.com with where clause')
print(OrderDetails.where(OrderDetails.EmailAddress == 'morgan30@adventure-works.com').count())
  
print('They are  ')
OrderDetails.where(OrderDetails.EmailAddress == 'morgan30@adventure-works.com').show()

In [None]:
print('Total rows in dataframe where SalesOrderNumber = SO43705 with where clause')
print(OrderDetails.where(OrderDetails.SalesOrderNumber == 'SO43705').count())
  
print('They are  ')
OrderDetails.where(OrderDetails.SalesOrderNumber == 'SO45347').show()

### Use Dataframe Methods
Spark DataFrames provide functions that you can use to extract and manipulate data. For example, you can use the **select** function to return a new dataframe containing columns selected from an existing dataframe.

In [None]:
# the use of dataframes from anoher dataframe is a quick way to create working tables
# you will want to manage these as you go along but they allow the data engineer to try different analysis methods easily.
OrderDetailSQL = OrderDetails.select("CustomerName", "OrderDate", "SalesOrderNumber", "OrderDate", "Item", "Quantity", "UnitPrice", "TaxAmount")
display(OrderDetailSQL.limit(20))

### Split Customer Name into more searchable format
Splitting the customer first name and last name is a common need in data transformation. It allows for easier searching. The following code will bring the OrderDetails dataframe as previously defined and add teh columns FirstName and LastName to the end which were split from teh CustomerName column.

In [None]:
### String Split of the column in pyspark
from pyspark.sql.functions import split
 
OrderDetails.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1)).show()

### SparkSQL
Now, let's take a look at how we can query using a language more familiar to some data engineers. starting by creating a view or table from a spark dataframe.

In [None]:
#Using the code above, let's create a new sql view/table
### String Split of the column in pyspark
from pyspark.sql.functions import split
 
temp_df = OrderDetails.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
temp_df.createOrReplaceTempView("SQLOrderDetails")

#### Create the view from the temporary dataframe

In [None]:
temp_df.createOrReplaceTempView("SQLOrderDetails")

In [None]:
%%sql
SELECT LastName, FirstName, count(SalesOrderNumber) FROM SQLOrderDetails GROUP BY  LastName, FirstName HAVING count(SalesOrderNumber) > 1

View the output as a chart, and set the view options as follows:

- **Chart type**: Line chart
- **Key**: LastName
- **Values**: count(SalesOrderNumber)
- **Series Group**: *blank*
- **Aggregation**: Sum

The line chart shows the number of orders broken down by customer last name. The data scientist can further enhance this data and look for correlation using different analysis techniques. 


### Show Duplicate Rows from the dataset, if any
### flag or check Duplicate rows in pyspark
 
import pyspark.sql.functions as f
temp_df.join(
    temp_df.groupBy(df_basket1.columns).agg((f.count("*")>1).cast("int").alias("Duplicate_indicator")),
    on=df_basket1.columns,
    how="inner"
).show()

### Partition the data by OrderDate and CustomerName
The following code will create a set of files that are partitioned by OrderDate and CustomerName and store it in a parquet file format which is stored in a distributed fashion for higher compression of the files and for performance when working with the data in a distributed file system.

In [None]:
temp_df.write.partitionBy('OrderDate', 'CustomerName').parquet('OrderDetailsExpanded')

In this notebook, you've explored how to use a spark notebook to query data files within the datalake to perform some basic analysis with pyspark and pysql. You then exported those results into a format named parquet which is optimized for distributed and massively parrallel processsing (MPP) systems.

We've only scratched the surface of the power of notebooks. To learn more, see the [Apache Spark Notebooks Documentation](https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/apache-spark-development-using-notebooks).