# Example for using `pyspark` in `kedro`

This notebook demonstrates how to leverage `spark` capabilities within the `kedro` framework. The huge benefit of `spark` lies in its **distributed, lazy loading paradigm**: First of all, `spark` can process huge amounts of data without loading the entire data into memory and second, scaling `spark` pipelines vertically accross a multitude of nodes in a compute cluster is super easy.

Below, you'll find examples for how to...
1. Load a `spark` session and use it to convert a pandas DataFrame into a spark DataFrame
2. Add a `SparkDataSet` to the kedro catalog
3. Save and load a `spark` DataFrame to/from the kedro catalog
4. Use `pyspark` to transform and analyze data

---

## Load a `spark` session and convert a pandas- into a `spark` DataFrame
One of spark's core principles is that there is always onle one spark session. In our case, this spark session has already been configured and created when we launched this kedro notebook (take a look at `src/context.py` to convince yourself).
Hence we can just use `SparkSession.builder.getOrCreate()` to piggy-back on that pre-configured spark session.

The data which we convert from pandas into a spark DataFrame is just a publickly available retail data set from the UCI machine learning repository. In the end, we'll use spark to analyze which customers generated the most revenue.
Also, we provide a **schema** when we transform the data into a spark DataFrame. By providing a schema, we drastically **reduce the memory footprint** of the underlying computation as we avoid that spark has to scan the entire data up-front to infer the data types itself.

In [3]:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.getOrCreate()

RETAIL_DATA_SCHEMA = 'InvoiceNo STRING, StockCode STRING, Description STRING, Quantity INT, InvoiceDate TIMESTAMP, '+\
    'UnitPrice FLOAT, CustomerID INT, Country STRING'

retail_data = pd.read_excel(
    'https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx',
)
retail_data_spark = spark.createDataFrame(
    data=retail_data.assign(CustomerID=retail_data['CustomerID'].fillna(0).astype(int)),
    schema=RETAIL_DATA_SCHEMA
)

## Add a `SparkDataSet` to the kedro catalog

For demo purposes, we're not using `context.catalog`, but create our own, local `catalog` object. However, the following steps (reading, writing, analyzing) would have worked in exactly the same way if `retail_data_spark` was defined in `catalog.yml`

Important to note here are the `load_args` and `save_args`, which we provide to the `SparkDataSet`. In particular, it is crucial for performance reasons to provide a schema via `load_args`. Moreover, make sure to set `mode` to `overwrite` when your pipelines whrite to the same data set several times.

In [7]:
# create a DataCatalog and add a SparkDataset
from kedro.io import DataCatalog
from kedro.extras.datasets.spark import SparkDataSet
import os

RETAIL_DATA_SCHEMA = 'InvoiceNo STRING, StockCode STRING, Description STRING, Quantity INT, InvoiceDate TIMESTAMP, '+\
    'UnitPrice FLOAT, CustomerID INT, Country STRING'

catalog = DataCatalog({'retail_data_spark':SparkDataSet(
        filepath=os.path.join(context.project_path,'data','01_raw','retail_data'),
        file_format='csv',
        load_args={'header':True, 'schema':RETAIL_DATA_SCHEMA},
        save_args={'header':True, 'mode':'overwrite'}
)})

  for spec in entry_points.get("fsspec.specs", []):


## Save and load a spark DataFrame to/from the kedro catalog

Once the `SparkDataSet` is configured, we can read/write to/from the data set just as we'd do with any other kedro data set. The only difference is that since we're using `spark`, we don't get the data as a pandas, but as a `spark` DataFrame (including all the useful lazy-loading and scalability benefits of spark)

In [8]:
catalog.save('retail_data_spark', retail_data_spark)

2021-05-12 22:12:45,895 - kedro.io.data_catalog - INFO - Saving data to `retail_data_spark` (SparkDataSet)...


  and should_run_async(code)


In [9]:
retail_data_spark_loaded = catalog.load('retail_data_spark')

2021-05-12 22:12:54,974 - kedro.io.data_catalog - INFO - Loading data from `retail_data_spark` (SparkDataSet)...


## Use `pyspark` to transform and analyze data

The section below is just to showcase how to perform some typical data analysis tasks in pyspark (e.g. data inspection & aggregation)

In [10]:
# inspecting the data
retail_data_spark_loaded.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: float (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [11]:
from pyspark.sql import functions as f
revenue_per_customer = retail_data_spark_loaded.\
    withColumn("invoice_amount", retail_data_spark_loaded['Quantity'] * retail_data_spark_loaded.UnitPrice).\
    groupBy(retail_data_spark_loaded.CustomerID).\
    agg(f.sum(f.col("invoice_amount")).alias("invoice_amount")).\
    orderBy(f.col("invoice_amount").desc())

In [12]:
revenue_per_customer.show()

+----------+------------------+
|CustomerID|    invoice_amount|
+----------+------------------+
|         0| 1447682.118258085|
|     14646|279489.01930066943|
|     18102|256438.48995232582|
|     17450| 187482.1704902649|
|     14911|132572.61962211132|
|     12415|123725.44990730286|
|     14156|113384.13939154148|
|     17511| 88125.37999302149|
|     16684| 65892.07912826538|
|     13694| 62653.09981799126|
|     15311| 59419.33947509527|
|     13089| 57385.88000047207|
|     14096| 57120.91003343463|
|     15061|54228.739621818066|
|     17949|52750.839977264404|
|     15769| 51823.72010803223|
|     16029| 50992.60983848572|
|     14298|  50862.4400164485|
|     14088| 50415.48962235451|
|     17841| 40340.77974051237|
+----------+------------------+
only showing top 20 rows

