# Exploring and fixing data with Synapse Spark

In this task, you will use a Synapse Spark notebook to explore a few of the files in the **wwi-02/sale-poc** folder in the data lake. You will also use Python code to fix the issues with the **sale-20170502.csv** file.

1. First, attach this notebook to the **SparkPool01** Spark pool.
2. In the code cell below, replace **asadatalake*SUFFIX*** `with the name of the primary data lake storage account associated with your Syanpse workspace. Then execute the cell by selecting the **Run cell** button that becomes visible when you select the cell.

> **Note**: The cell may take some time to run because the spark cluster must be started.

In [1]:
adls_account_name = 'vksa042772'

StatementMeta(venkysparkpool, 0, 2, Finished, Available)

## Exploring files with Spark

1. The first step in exploring data using Synapse Spark is to load a file from the data lake. For this, we'll use the **spark.read.load()** method of the **SparkSession** to load the **sale-20170501.csv** file into a [DataFrame](https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#datasets-and-dataframes).


In [2]:
# First, load the file `sale-20170501.csv` file, which we know from our previous exploration to be formatted correctly.
# Note the use of the `header` and `inferSchema` parameters. Header indicates the first row of the file contains column headers,
# and `inferSchema` instruct Spark to use data within the file to infer data types.
df = spark.read.load(f'abfss://raw042772@{adls_account_name}.dfs.core.windows.net/wwi-02/sale-poc/sale-20170501.csv', format='csv', header=True, inferSchema=True)

StatementMeta(venkysparkpool, 0, 3, Finished, Available)

## View the contents of the DataFrame

With the data from the **sale-20170501.csv** file loaded into a data frame, we can now use various methods of a data frame to explore the properties of the data.

1. Let's look at the data as it was imported. Execute the cell below to view and inspect the data in the data frame.

In [3]:
display(df.limit(10))

StatementMeta(venkysparkpool, 0, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9b7433b8-88d7-42b8-a098-096e98db1e82)

2. Like we saw during exploration with the SQL on-demand capabilities of Azure Synapse, Spark allows us to view and query against the data contained within files. 

3. Now, use the **printSchema()** method of the data frame to view the results of using the **inferSchema** parameter when creating the data frame. Execute the cell below and observe the output.

In [4]:
# Now, print the inferred schema. We will need this information below to help with the missing headers in the May 2, 2017 file.
df.printSchema()

StatementMeta(venkysparkpool, 0, 5, Finished, Available)

root
 |-- TransactionId: string (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- ProductId: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- TransactionDate: integer (nullable = true)
 |-- ProfitAmount: double (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Minute: integer (nullable = true)
 |-- StoreId: integer (nullable = true)

4. The **printSchema** method outputs both field names and data types that are based on the Spark engine's evaluation of the data contained within each field.

    > We can use this information later to help define the schema for the poorly formed **sale-20170502.csv** file. In addition to the field names and data types, we should note the number of features or columns contained in the file. In this case, note that there are 11 fields. That will be used to determine where to split the single row of data.

5. As an example of further exploration we can do, run the cell below to create and display a new data frame that contains an ordered list of distinct Customer and Product Id pairings. We can use these types of functions to find invalid or empty values quickly in targeted fields.

In [5]:
# Create a new data frame containing a list of distinct CustomerId and ProductId values in descending order of the CustomerId.
df_distinct_products = df.select('CustomerId', 'ProductId').distinct().orderBy('CustomerId')

# Display the first 100 rows of the resulting data frame.
display(df_distinct_products.limit(100))

StatementMeta(venkysparkpool, 0, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, a2977882-fd01-4d51-9de5-a0c8941f4891)

6. Next, let's attempt to open and explore the **sale-20170502.csv** file using the **load()** method, as we did above.

In [6]:
# Next, let's try to read in the May 2, 2017 file using the same `load()` method we used for the first file.
df = spark.read.load(f'abfss://raw042772@{adls_account_name}.dfs.core.windows.net/wwi-02/sale-poc/sale-20170502.csv', format='csv')
display(df.limit(10))

StatementMeta(venkysparkpool, 0, 7, Finished, Available)

Py4JJavaError: An error occurred while calling o650.load.
: com.univocity.parsers.common.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - 20480
Hint: Number of columns processed may have exceeded limit of 20480 columns. Use settings.setMaxColumns(int) to define the maximum number of columns your input can have
Ensure your configuration is correct, with delimiters, quotes and escape sequences that match the input format you are trying to parse
Parser Configuration: CsvParserSettings:
	Auto configuration enabled=true
	Auto-closing enabled=true
	Autodetect column delimiter=false
	Autodetect quotes=false
	Column reordering enabled=true
	Delimiters for detection=null
	Empty value=
	Escape unquoted values=false
	Header extraction enabled=null
	Headers=null
	Ignore leading whitespaces=false
	Ignore leading whitespaces in quotes=false
	Ignore trailing whitespaces=false
	Ignore trailing whitespaces in quotes=false
	Input buffer size=1048576
	Input reading on separate thread=false
	Keep escape sequences=false
	Keep quotes=false
	Length of content displayed on error=1000
	Line separator detection enabled=false
	Maximum number of characters per column=-1
	Maximum number of columns=20480
	Normalize escaped line separators=true
	Null value=
	Number of records to read=all
	Processor=none
	Restricting data in exceptions=false
	RowProcessor error handler=null
	Selected fields=none
	Skip bits as whitespace=true
	Skip empty lines=true
	Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
	CsvFormat:
		Comment character=#
		Field delimiter=,
		Line separator (normalized)=\n
		Line separator sequence=\n
		Quote character="
		Quote escape character=\
		Quote escape escape character=null
Internal state when error was thrown: line=0, column=20481, record=0, charIndex=277709
	at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402)
	at com.univocity.parsers.common.AbstractParser.parseLine(AbstractParser.java:707)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$inferFromDataset$1(CSVDataSource.scala:125)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.inferFromDataset(CSVDataSource.scala:125)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:113)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:65)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:62)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:209)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:206)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:419)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:308)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 20480
	at com.univocity.parsers.common.ParserOutput.valueParsed(ParserOutput.java:373)
	at com.univocity.parsers.csv.CsvParser.parseSingleDelimiterRecord(CsvParser.java:189)
	at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:109)
	at com.univocity.parsers.common.AbstractParser.parseLine(AbstractParser.java:687)
	... 26 more


7. As we saw in T-SQL, we receive a similar error in Spark that the number of columns processed may have exceeded limit of 20480 columns. To work with the data in this file, we need to use more advanced methods, as you will see in the next section below.


## Handling and fixing poorly formed CSV files

> The steps below provide example code for fixing the poorly-formed CSV file, **sale-20170502.csv** we discovered during exploration of the files in the **wwi-02/sale-poc** folder. This is just one of many ways to handle "fixing" a poorly-formed CSV file using Spark.

1. To "fix" the bad file, we need to take a programmatic approach, using Python to read in the contents of the file and then parse them to put them into the proper shape.

    > To handle the data being in a single row, we can use the **textFile()** method of our **SparkContext** to read the file as a collection of rows into a resilient distributed dataset (RDD). This allows us to get around the errors around the number of columns because we are essentially getting a single string value stored in a single column.

2. Execute the cell below to load the RDD with data from the file.

In [7]:
# Import the NumPy library. NumPy is a python library used for working with arrays.
import numpy as np

# Read the CSV file into a resilient distributed dataset (RDD) as a text file. This will read each row of the file into rows in an RDD.
rdd = sc.textFile(f'abfss://raw042772@{adls_account_name}.dfs.core.windows.net/wwi-02/sale-poc/sale-20170502.csv')

StatementMeta(venkysparkpool, 0, 8, Finished, Available)

3. With the data now stored in an RDD, we can access the first, and only, populated row in the RDD, and split that into individual fields. We know from our inspection of the file in Notepad++ that it all the fields are separated by a comma (,), so let's start by splitting on that to create an array of field values. Execute the cell below to create a data array.

In [8]:
# Since we know there is only one row, grab the first row of the RDD and split in on the field delimiter (comma).
data = rdd.first().split(',')

field_count = len(data)
# Print out the count of fields read into the array.
print(field_count)

StatementMeta(venkysparkpool, 0, 9, Finished, Available)

6570961

4. By splitting the row on the field delimiter, we created an array of all the individual field values in the file, the count of which you can see above.

5. Now, run the cell below to do a quick calculation on the expected number of rows that will be generated by parsing every 11 fields into a single row.

In [9]:
import math

expected_row_count = math.floor(field_count / 11)
print(f'The expected row count is: {expected_row_count}')

StatementMeta(venkysparkpool, 0, 10, Finished, Available)

The expected row count is: 597360

6. Next, let's create an array to store the data associated with each "row".

    > We will set the max_index to the number of columns that are expected in each row. We know from our exploration of other files in the **wwi-02/sale-poc** folder that they contain 11 columns, so that is the value we will set.

7. In addition to setting variables, we will use the cell below to loop through the **data** array and assign every 11 values to a row. By doing this, we are able to "split" the data that was once a single row into appropriate rows containing the proper data and columns from the file.

8. Execute the cell below to create an array of rows from the file data.

In [10]:
# Create an array to store the data associated with each "row". Set the max_index to the number of columns that are in each row. This is 11, which we noted above when viewing the schema of the May 1 file.
row_list = []
max_index = 11

# Now, we are going to loop through the array of values extracted from the single row of the file and build rows consisting of 11 columns.
while max_index <= len(data):
    row = [data[i] for i in np.arange(max_index-11, max_index)]
    row_list.append(row)

    max_index += 11

print(f'The row array contains {len(row_list)} rows. The expected number of rows was {expected_row_count}.')

StatementMeta(venkysparkpool, 0, 11, Finished, Available)

The row array contains 597360 rows. The expected number of rows was 597360.

9. The last thing we need to do to be able to work with the file data as rows is to read it into a Spark DataFrame. In the cell below, we use the **createDataFrame()** method to convert the **row_list** array into a data frame, which also adding names for the columns. Column names are based on the schema we observed in the well formatted files in the **wwi-02/sale-poc** directory.

10. Execute the cell below to create a data frame containing row data from the file and then display the first 10 rows.

In [11]:
# Finally, we can use the row_list we created above to create a DataFrame. We can add to this a schema parameter, which contains the column names we saw in the schema of the first file.
df_fixed = spark.createDataFrame(row_list,schema=['TransactionId', 'CustomerId', 'ProductId', 'Quantity', 'Price', 'TotalAmount', 'TransactionDateId', 'ProfitAmount', 'Hour', 'Minute', 'StoreId'])
display(df_fixed.limit(10))

StatementMeta(venkysparkpool, 0, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, b5f00132-39d3-4f76-9bd0-63e20428eccc)

## Write the "fixed" file into the data lake

1. The last step we will take as part of our exploration and file fixing process is to write the data back into the data lake, so it can be ingested following the same process as the other files in the **wwi-02/sale-poc** folder.

2. Execute the cell below to save the data frame into the data lake a series of files in a folder named **sale-20170502-fixed**.

    > Note: Spark parallelizes workloads across worker nodes, so when saving files, you will notice they are saved as a collection "part" files, and not as a single file. While there are some libraries you can use to create a single file, it is helpful to get used to working with files generated via Spark notebooks as they are natively created.


In [13]:
df_fixed.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save(f'abfss://raw042772@{adls_account_name}.dfs.core.windows.net/wwi-02/sale-poc/sale-20170502-fixed')

StatementMeta(venkysparkpool, 0, 14, Finished, Available)

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

profitByDateProduct = (df.groupBy("TransactionDate","ProductId")
    .agg(
        sum("ProfitAmount").alias("(sum)ProfitAmount"),
        round(avg("Quantity"), 4).alias("(avg)Quantity"),
        sum("Quantity").alias("(sum)Quantity"))
    .orderBy("TransactionDate"))
display(profitByDateProduct.limit(100))

StatementMeta(venkysparkpool, 0, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8e3ffe95-0ef4-47f6-98fa-107a7c998114)

## Inspect the fixed file in the data lake

1. With the fixed file written to the data lake, you can quickly inpsect it to verify the files are now formatted properly. Select the **wwi-02** tab above to view the **sale-poc** folder.
2. Refresh the folder view (expand the **More** menu if necessary) and then open the **sale-20170502-fixed** folder.
3. In the **sale-20170502-fixed** folder, right-click the first file whose name begins with **part** and whose extension is **.csv** and select **Preview** from the context menu.
4. In the **Preview** dialog, verify you see the proper columns and that the data looks valid in each field.

## Wrap-up

Throughout this exercise, you used a Spark notebook to explore data stored within files in the data lake. You used Python code to extract data from a poorly formatted CSV file, assemble the data from that file into proper rows, and then write the "fixed" file back out into your data lake.

You can now return to the lab guide to continue with the next section of Lab 2.
