# Transform Data with Spark in Azure Synapse Analytics

Spark allows you to work with and manipulate data into information

In this notebook, you'll consolidate heterogenous sources and create a homogenous output for consumption into a partitioned parquet format. This file can be used for consumption by a data scientist or data analyst for further analysis.

> **Note**: This notebook is designed to be run in an Azure Synapse Analytics Spark pool.


## Attach this notebook to a Spark pool

To run the code in this notebook, you'll need to use a Spark pool; so at the top of this notebook, in the **Attach to** list, select your Spark pool.

## Explore Data

Before training a model, a data engineer will explore the data to ensure that its profile matches what is expected, which is usually in the form of a technical specification. The use of notebooks though, allows data professionals to place these specifications within the notebook itself and allows for much greater collaboration throughout the organization.

In this example, you'll explore some historical flight data with which we'll later transform into a denormalized structure and store it on disk for consumption by other data professionals downstream.


### Load Data Using an Explicit Schema

Let's start by loading some historical Sales Order data into a dataframe. If the structure of the data is known ahead of time, you can explicitly specify the schema for the dataframe.

Review the code in the cell below, which defines a schema for Sales Order data before loading it from all of the csv files within the data directoyr. Then click the **&#9655;** button to the left of the cell to run it.

> **Note**: The first time you run a cell in a notebook, the Spark pool must be started; which can take several minutes.

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

OrderSchema = StructType([
  StructField("SalesOrderNumber", StringType(), False),
  StructField("SalesOrderLineNumber", IntegerType(), False),
  StructField("OrderDate", DateType(), False),
  StructField("CustomerName", StringType(), False),
  StructField("EmailAddress", StringType(), False),
  StructField("Item", StringType(), False),
  StructField("Quantity", IntegerType(), False),
  StructField("UnitPrice", StringType(), False),
  StructField("TaxAmount", StringType(), False)
])

#let's not query the entire dataset, instead let's look at the top 20 results
OrderDetails = spark.read.csv('/data/*.csv', schema=OrderSchema, header=True)
display(OrderDetails.limit(20))


StatementMeta(sparkhc9o2ay, 0, 1, Finished, Available)

SynapseWidget(Synapse.DataFrame, 97b31a73-c658-4138-8298-9501f762a9de)

### Infer a Data Schema
If the structure of the data source is unknown, you can have Spark automatically infer the schema.

In this case, you will load data about airports without knowing the schema.

Run the following cell to load airport data from a text file, inferring the column names and data types automatically.

In [None]:
OrderDetails2019 = spark.read.csv('/data/*.csv', header=True, inferSchema=True)
#display(OrderDetails2019.limit(20))


### Count the Rows in a Dataframes
Now that you're familiar with working with dataframes, a key task when building predictive solutions is to explore the data, determing statistics that will help you understand the data before building predictive models. For example, how many rows of flight data do you actually have?

In [None]:
# This spans across all the files to perform a count within the dataframe
OrderDetails.count()


### Count inferred rows
This will execute the same query on inferred data from all of the files with a csv extension in the noted folder. The count should be the same as that above.

In [None]:
# This code spans the single file pulled into the dataframe
OrderDetails2019.count()

### Remove any headers from the consolidation of the source files

In [None]:
OrderDetails = OrderDetails.filter(OrderDetails.SalesOrderNumber == "SalesOrderNumber")
print(OrderDetails)

In [None]:
print('Total rows in dataframe where \
EmailAddress = morgan30@adventure-works.com with where clause')
print(OrderDetails.where(OrderDetails.EmailAddress == 'morgan30@adventure-works.com').count())
  
print('They are  ')
OrderDetails.where(OrderDetails.EmailAddress == 'morgan30@adventure-works.com').show()

### Partition the data by OrderDate and Order Item

In [None]:
OrderDetails.write.partitionBy('OrderDate', 'Item').parquet('OrderDetails')

In [None]:
print('Total rows in dataframe where SalesOrderNumber = SO45347 with where clause')
print(OrderDetails.where(OrderDetails.SalesOrderNumber == 'SO45347').count())
  
print('They are  ')
OrderDetails.where(OrderDetails.SalesOrderNumber == 'SO45347').show()

### Use Dataframe Methods
Spark DataFrames provide functions that you can use to extract and manipulate data. For example, you can use the **select** function to return a new dataframe containing columns selected from an existing dataframe.

In [None]:
# the use of dataframes from anoher dataframe is a quick way to create working tables
# you will want to manage these as you go along but they allow the data engineer to try different analysis methods easily.
cities = airports.select("city", "name")
display(cities.limit(20))

### Combine Operations
You can combine functions in a single statement to perform multiple operations on a dataframe. In this case, you will use the **join** function to combine the **flights** and **airports** dataframes, and then use the **groupBy** and **count** functions to return the number of flights from each airport, and finally use the **orderBy** function to sort the results by the number of flights.

In [None]:
flightsByOrigin = flights.join(airports, flights.OriginAirportID == airports.airport_id).groupBy("city").count().orderBy("count")
display(flightsByOrigin.limit(30))

### Determine the Presence of Duplicates
The data you have to work with won't always be perfect - often you'll want to *clean* the data; for example to detect and remove duplicates that might affect your model. You can use the **dropDuplicates** function to create a new dataframe with the duplicates removed, enabling you to determine how many rows are duplicates of other rows.

In [None]:
flights.count() - flights.dropDuplicates().count()

### Identify Missing Values
As well as determining if duplicates exist in your data, you should detect missing values, and either remove rows containing missing data or replace the missing values with a suitable relacement. The **dropna** function creates a dataframe with any rows containing missing data removed - you can specify a subset of columns, and whether the row should be removed in *any* or *all* values are missing. You can then use this new dataframe to determine how many rows contain missing values.

In [None]:
flights.count() - flights.dropDuplicates().dropna(how="any", subset=["ArrDelay", "DepDelay"]).count()

### Clean the Data
Now that you've identified that there are duplicates and missing values, you can clean the data by removing the duplicates and replacing the missing values. The **fillna** function replaces missing values with a specified replacement value. In this case, you'll remove all duplicate rows and replace missing **ArrDelay** and **DepDelay** values with **0**.

In [None]:
data=flights.dropDuplicates().fillna(value=0, subset=["ArrDelay", "DepDelay"])
data.count()

### Explore the Data
Now that you've cleaned the data, you can start to explore it and perform some basic analysis. Let's start by examining the lateness of a flight. The dataset includes the **ArrDelay** field, which tells you how many minutes behind schedule a flight arrived. However, if a flight is only a few minutes behind schedule, you might not consider it *late*. Let's make our definition of lateness such that flights that arrive within 25 minutes of their scheduled arrival time are considered on-time, but any flights that are more than 25 minutes behind schedule are classified as *late*. We'll add a column to indicate this classification:

In [None]:
data = data.select("DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID","DestAirportID",
                   "DepDelay", "ArrDelay", ((col("ArrDelay") > 25).cast("Int").alias("Late")))
display(data.limit(20))

### Explore Summary Statistics and Data Distribution
Having a good idea of the shape of the data is simple with Spark, you might have a technical spec to compare this too to ensure that your attributes match this technical spec to the actual summary statistics for the columns in our data. The **describe** function returns a dataframe containing the **count**, **mean**, **standard deviation**, **minimum**, and **maximum** values for each numeric column.

In [None]:
display(data.describe())

The *DayofMonth* is a value between 1 and 31, and the mean is around halfway between these values; which seems about right. The same is true for the *DayofWeek* which is a value between 1 and 7. *Carrier* is a string, so there are no numeric statistics; and we can ignore the statistics for the airport IDs - they're just unique identifiers for the airports, not actually numeric values. The departure and arrival delays range between 63 or 94 minutes ahead of schedule, and over 1,800 minutes behind schedule. The means are much closer to zero than this, and the standard deviation is quite large; so there's quite a bit of variance in the delays. The *Late* indicator is a 1 or a 0, but the mean is very close to 0; which implies that there significantly fewer late flights than non-late flights.

Let's verify that assumption by creating a table and using the **Spark SQL** API to run a SQL statement that counts the number of late and non-late flights:

In [None]:
data.createOrReplaceTempView("flightData")
lateCounts = spark.sql("SELECT COUNT(*) AS Count, Late FROM flightData GROUP BY Late")
display(lateCounts)


Yes, it looks like there are significantly more non-late flights than late ones. You can see this more clearly with a visualization, so in the output above, select the **Chart** view to see the comparative counts as a bar chart..

Let's address the outliers and imbalanced classes in our data by removing rows with extreme delay values, and *undersampling* the more common on-time flights:

In [None]:
from pyspark.sql.functions import rand

# Remove outliers - let's make the cut-off 150 minutes.
data = data.filter("DepDelay < 150 AND ArrDelay < 150")

# Separate the late and on-time flights
pos = data.filter("Late = 1")
neg = data.filter("Late = 0")

# undersample the most prevalent class to get a roughly even distribution
posCount = pos.count()
negCount = neg.count()
if posCount > negCount:
  pos = pos.sample(True, negCount/(negCount + posCount))
else:
  neg = neg.sample(True, posCount/(negCount + posCount))
  
# shuffle into random order (so a sample of the first 1000 has a mix of classes)
data = neg.union(pos).orderBy(rand())

# Replace the temporary table so we can query and visualize the balanced dataset
data.createOrReplaceTempView("flightData")

# Show the statistics
display(data.describe())

Now the maximums for the **DepDelay** and **ArrDelay** are clipped at under 150, and the mean value for the binary *Late* class is nearer 0.5; indicating a more or less even number of each class. We removed some data to accomplish this balancing act, but there are still a substantial number of rows for us to train a machine learning model with, and now the data is more balanced. Let's visualize the data again to confirm this:

In [None]:
%%sql
SELECT Late, COUNT(*) AS Count FROM flightData GROUP BY Late

Display the data as a chart to compare the distribution of the **Late** classes as you did previously. There should now be a more even number of each class.

### Explore Relationships in the Data
Predictive modeling is largely based on statistical relationships between fields in the data. To design a good model, you need to understand how the data points relate to one another.

A common way to start exploring relationships is to create visualizations that compare two or more data values. For example, run the following query to compare arrival delay by carrier.


In [None]:
%%sql
SELECT Carrier, ArrDelay FROM flightData

View the output as a chart, and then use the **View Options** button (which looks similar to **&#128463;<sub>*</sub>**) at the top-right of the output to configure the chart as follows:

- **Chart type**: Box plot
- **Key**: Carrier
- **Values**: ArrDelay

When you apply the view options, the box plots should should show that the median delay (the line in the middle of the box), and the distribution of delays varies by carrier; with some carriers having a higher median delay than others. The same is true for other features, such as the day of the week and the destination airport.



You may already suspect that there's likely to be a relationship between departure delay and arrival delay, so let's examine that next. Run the next cell to retrieve the departure and arrival delays for each flight.

In [None]:
%%sql
SELECT DepDelay, ArrDelay FROM flightData

View the utput as a chart, and set the view options as follows:

- **Chart type**: Scatter chart
- **Key**: DepDelay
- **Values**: ArrDelay
- **Series Group**: *blank*
- **Aggregation**: Avg

The scatter plot shows the average arrival deplay for each recorded departure delay. Note that the points form a diagonal line, which indicates a strong linear relationship between departure delay and arrival delay.

This linear relationship shows a *correlation* between these two values, which we can measure statistically. The **corr** function calculates a correlation value between -1 and 1, indicating the strength of correlation between two fields. A strong positive correlation (near 1) indicates that high values for one column are often found with high values for the other, which a strong negative correlation (near -1) indicates that *low* values for one column are often found with *high* values for the other. A correlation near 0 indicates little apparent relationship between the fields.

Run the following cell to see the correlation statistic for these two variables.

In [None]:
data.corr("DepDelay", "ArrDelay")

The correlation is close to 1, indicating a reasonably strong positive correlation between departure delay and arrval delay. Flights that depart late, unsurprisingly, often arrive late!

## Train a machine learning model

In a real scenario, a data scientist would explore the statistical distributions and relationships in the data in more depth, and perform *feature engineering* to prepare the dataset for training a machine learning model. In this exercise, we've provided a prepared version of the data for you. Use the following code to load it into a dataframe.

In [None]:
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
  StructField("Late", IntegerType(), False),
])

data = spark.read.csv('/data/flights.csv', schema=flightSchema, header=True)
display(data.limit(20))

In this notebook, you've explored how to load data from the data lake into a spark notebook, manipulate it and transform it for consumption by other downstream analysts and data scientists.

We've only scratched the surface of what can be done with notebooks.