# Spark Syntax

On this laptop I have installed PySpark using anaconda. The goal is to demo the set up of a quick delta dataset that will have features like version history

The data used in this example is taken from Kaggle here: https://www.kaggle.com/zynicide/wine-reviews



## Initiate Spark 

This demo uses PySpark, but in the future the standalone writer will be available. At that point you could run this purely in python
If this is released i will create a python notebook as well

In [7]:
!rmdir /s /q "C:\Spark\data\wine\wine_delta\".

The system cannot find the file specified.


In [8]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## Create a Schema
As we want to use a streaming way of reading in the data, we need to define a schema. In this example i will simply load the data using the regular Batch method with `infer_schema` set tot True. This schema is used for the stream

In [9]:
df = spark.read.csv(header=True, path='../rawdata/wine/wine.csv', inferSchema=True)
inferred_schema = df.schema
df.printSchema()

root
 |-- country: string (nullable = true)
 |-- points: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- province: string (nullable = true)
 |-- region_1: string (nullable = true)
 |-- region_2: string (nullable = true)
 |-- taster_name: string (nullable = true)
 |-- taster_twitter_handle: string (nullable = true)
 |-- title: string (nullable = true)
 |-- variety: string (nullable = true)
 |-- winery: string (nullable = true)



In [10]:
df.toPandas().head(5)

Unnamed: 0,country,points,price,province,region_1,region_2,taster_name,taster_twitter_handle,title,variety,winery
0,Italy,88,41.0,Sicily & Sardinia,Etna,,,,Benanti 2005 Il Monovitigno (Etna),Nerello Mascalese,Benanti
1,US,88,60.0,Oregon,Willamette Valley,Willamette Valley,Paul Gregutt,@paulgwine,Benton-Lane 2006 First Class Pinot Noir (Willa...,Pinot Noir,Benton-Lane
2,France,88,26.0,Southwest France,Montravel,,Roger Voss,@vossroger,Château Puy-Servain 2003 Vieilles Vignes (Mon...,Bordeaux-style Red Blend,Château Puy-Servain
3,France,88,25.0,Southwest France,Montravel,,Roger Voss,@vossroger,Château Puy-Servain 2007 Marjolaine (Montravel),Bordeaux-style White Blend,Château Puy-Servain
4,France,88,16.0,Loire Valley,Quincy,,Roger Voss,@vossroger,Domaine Philippe Portier 2007 Quincy,Sauvignon Blanc,Domaine Philippe Portier


In [11]:
## Create the Stream to read in the Raw Data
# In the following section we will read in the data from the wine_source folder. This 

In [12]:
source_path = '../rawdata/wine/wine_source'
sink_path = "../data/wine/wine_delta"

In [13]:
# GJ: TODO ADD BASH COMMAND TO REMOVE FILES IN WINE_SOURCE, WINE_DELTA
! del "C:\Spark\rawdata\wine\wine_source\wine_new.csv"
! del "C:\Spark\rawdata\wine\wine_source\wineupdate.csv"

In [14]:
data = (spark
        .readStream
        .format("csv")
        .option("header", True)
        .schema(inferred_schema)
        .load(source_path)
       )
data

DataFrame[country: string, points: int, price: double, province: string, region_1: string, region_2: string, taster_name: string, taster_twitter_handle: string, title: string, variety: string, winery: string]

In [18]:
saveloc = sink_path
streamQuery = (data
              .writeStream
              .format('delta')
              .option('checkpointLocation', f"{saveloc}/_checkpoint")
              .trigger(once=True)
              .start(saveloc)
              )

In [21]:
streamQuery.recentProgress

[{'id': '224a9794-bf0b-4788-9cfe-86f9e91a8b6e',
  'runId': '7bb58882-1339-4592-b20a-5948da2443cb',
  'name': None,
  'timestamp': '2021-11-10T18:26:03.347Z',
  'batchId': 0,
  'numInputRows': 0,
  'inputRowsPerSecond': 0.0,
  'processedRowsPerSecond': 0.0,
  'durationMs': {'addBatch': 5430,
   'getBatch': 24,
   'queryPlanning': 14,
   'triggerExecution': 6222},
  'stateOperators': [],
  'sources': [{'description': 'FileStreamSource[file:/C:/Spark/rawdata/wine/wine_source]',
    'startOffset': None,
    'endOffset': {'logOffset': 0},
    'numInputRows': 0,
    'inputRowsPerSecond': 0.0,
    'processedRowsPerSecond': 0.0}],
  'sink': {'description': 'DeltaSink[../data/wine/wine_delta]',
   'numOutputRows': -1}}]

## What did we do
We ran a Streamquery that inserted the data in a delta table, from source to target. Using the recentProgress we can see that 
117969 records were inserted into the delta table. 

We can now move forward with our analysis. Below i am loading this data using the regular batch way. We can see it is a regular dataframe and can use it as we want
    

In [26]:
df = (spark
     .read
     .format('delta')
     .load(sink_path))

In [27]:
unique_key = ['taster_name', 'title']

In [28]:
df.count()

117969

## More data

We receive more data for our analysis. Let's say that a new month of observations is to be added. We want to keep track of the new data
but also do not want additional work to incorporate this new data. This is streaming to the rescue!


In [29]:
# add wine_new.csv to Rawdata repo
!copy "C:\Spark\rawdata\wine\wine_new.csv" "C:\Spark\rawdata\wine\wine_source\wine_new.csv"

        1 file(s) copied.


In [30]:
saveloc = sink_path
streamQuery = (data
              .writeStream
              .format('delta')
              .option('checkpointLocation', f"{saveloc}/_checkpoint")
              .trigger(once=True)
              .start(saveloc)
              )

In [32]:
streamQuery.recentProgress

[{'id': '224a9794-bf0b-4788-9cfe-86f9e91a8b6e',
  'runId': 'da168ca5-5ad2-427f-ba8e-cd3b42d9418a',
  'name': None,
  'timestamp': '2021-11-10T18:26:49.143Z',
  'batchId': 1,
  'numInputRows': 1000,
  'inputRowsPerSecond': 0.0,
  'processedRowsPerSecond': 140.98406880022557,
  'durationMs': {'addBatch': 4810,
   'getBatch': 18,
   'latestOffset': 798,
   'queryPlanning': 15,
   'triggerExecution': 7093,
   'walCommit': 779},
  'stateOperators': [],
  'sources': [{'description': 'FileStreamSource[file:/C:/Spark/rawdata/wine/wine_source]',
    'startOffset': {'logOffset': 0},
    'endOffset': {'logOffset': 1},
    'numInputRows': 1000,
    'inputRowsPerSecond': 0.0,
    'processedRowsPerSecond': 140.98406880022557}],
  'sink': {'description': 'DeltaSink[../data/wine/wine_delta]',
   'numOutputRows': -1}}]

In [33]:
df = (spark
     .read
     .format('delta')
     .load(sink_path))

In [34]:
df.count()

118969

## What happened?

We reran the streaming query and it remembered that it already processed the 118969 lines and now online added the 1k lines

This is very helpful if you expected to receive more updates as data becomes available

## Delta Table
As we inserted the data into a delta table we now have more options available

First of all there is the history of the delta table. It will show what versions have existed and we can call them as required. To do that:


In [35]:
import delta.tables as dtables
delta_df = dtables.DeltaTable.forPath(spark, sink_path)

In [36]:
history = delta_df.history().take(5)
for row in history:
    print(row)

Row(version=1, timestamp=datetime.datetime(2021, 11, 10, 19, 26, 51, 570000), userId=None, userName=None, operation='STREAMING UPDATE', operationParameters={'epochId': '1', 'outputMode': 'Append', 'queryId': '224a9794-bf0b-4788-9cfe-86f9e91a8b6e'}, job=None, notebook=None, clusterId=None, readVersion=0, isolationLevel=None, isBlindAppend=True, operationMetrics={'numOutputRows': '1000', 'numRemovedFiles': '0', 'numAddedFiles': '1', 'numOutputBytes': '61230'}, userMetadata=None)
Row(version=0, timestamp=datetime.datetime(2021, 11, 10, 19, 25, 58, 639000), userId=None, userName=None, operation='STREAMING UPDATE', operationParameters={'epochId': '0', 'outputMode': 'Append', 'queryId': '224a9794-bf0b-4788-9cfe-86f9e91a8b6e'}, job=None, notebook=None, clusterId=None, readVersion=None, isolationLevel=None, isBlindAppend=True, operationMetrics={'numOutputRows': '117969', 'numRemovedFiles': '0', 'numAddedFiles': '5', 'numOutputBytes': '5298445'}, userMetadata=None)


We see that two versions exists with both a timestamp, version and numOutputrows showing what actually happened. To actually load a particular version use:

In [37]:
# df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load(sink_path)
# df2 = spark.read.format("delta").option("versionAsOf", version).load(sink_path)

# Updates
This above works great as more data is added. What can also occur is that you receive updates that actually change existing data. Let's say that the Reviewscore was incorrectly calculated for a 500 reviews. This bug has been resolved and now we want to update our data. However, we do not want to lose our earlier work and be unable to reprocess the old models. This is possible in our current setup using the following change:

In [38]:
# move the 500 changed reviews into the Rawdata folder
!copy "C:\Spark\rawdata\wine\wineupdate.csv" "C:\Spark\rawdata\wine\wine_source\wineupdate.csv"

        1 file(s) copied.


In [39]:
def merge_function(new_data_df, batch_id):
    # good practice to remove possible duplicates within the upload set, unique key identifies a unique review and is set at the top of the notebook
    new_data_df = new_data_df.dropDuplicates(unique_key)
    (delta_df.alias("original")
     .merge(
         new_data_df.alias("new_data"),
         "(original.taster_name = new_data.taster_name AND original.title = new_data.title)"
     )
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute()
    )
    

In [43]:
dfn = spark.read.csv(header=True, path='../rawdata/wine/wineupdate.csv', inferSchema=True)
dfn.select(unique_key + ['points']).toPandas().head(4)

Unnamed: 0,taster_name,title,points
0,Kerin O’Keefe,Cormòns 2014 Sauvignon (Collio),83
1,Roger Voss,Pierre Morey 2005 Santenots Premier Cru (Volnay),81
2,Michael Schachner,Pedro Escudero 2012 Fuente Elvira Verdejo (Rueda),35
3,Kerin O’Keefe,Poggio al Chiuso 2014 Voltaccia 49 Merlot (Tos...,53


In [44]:
dfn.count()

100

In [45]:
# now we replace the streamQuery with a new component
saveloc = sink_path
streamQuery = (data
              .writeStream
              .format('delta')
              .option('checkpointLocation', f"{saveloc}/_checkpoint")
              .foreachBatch(merge_function)
              .trigger(once=True)
              .start(saveloc)
              )

In [51]:
streamQuery.recentProgress

[{'id': '224a9794-bf0b-4788-9cfe-86f9e91a8b6e',
  'runId': 'd50c4b13-4087-4593-be0e-eccb595d787d',
  'name': None,
  'timestamp': '2021-11-10T18:28:04.108Z',
  'batchId': 2,
  'numInputRows': 200,
  'inputRowsPerSecond': 0.0,
  'processedRowsPerSecond': 3.8220455587830604,
  'durationMs': {'addBatch': 49785,
   'getBatch': 15,
   'latestOffset': 746,
   'queryPlanning': 10,
   'triggerExecution': 52328,
   'walCommit': 693},
  'stateOperators': [],
  'sources': [{'description': 'FileStreamSource[file:/C:/Spark/rawdata/wine/wine_source]',
    'startOffset': {'logOffset': 1},
    'endOffset': {'logOffset': 2},
    'numInputRows': 200,
    'inputRowsPerSecond': 0.0,
    'processedRowsPerSecond': 3.8220455587830604}],
  'sink': {'description': 'ForeachBatchSink', 'numOutputRows': -1}}]

In [52]:
history = delta_df.history().take(5)
for row in history:
    print(row)

Row(version=2, timestamp=datetime.datetime(2021, 11, 10, 19, 28, 51, 705000), userId=None, userName=None, operation='MERGE', operationParameters={'matchedPredicates': '[{"actionType":"update"}]', 'predicate': '((original.`taster_name` = new_data.`taster_name`) AND (original.`title` = new_data.`title`))', 'notMatchedPredicates': '[{"actionType":"insert"}]'}, job=None, notebook=None, clusterId=None, readVersion=1, isolationLevel=None, isBlindAppend=False, operationMetrics={'numOutputRows': '118971', 'numTargetRowsInserted': '2', 'numTargetRowsUpdated': '99', 'numTargetFilesAdded': '200', 'numTargetFilesRemoved': '6', 'numTargetRowsDeleted': '0', 'scanTimeMs': '6767', 'numSourceRows': '100', 'executionTimeMs': '45531', 'numTargetRowsCopied': '118870', 'rewriteTimeMs': '38757'}, userMetadata=None)
Row(version=1, timestamp=datetime.datetime(2021, 11, 10, 19, 26, 51, 570000), userId=None, userName=None, operation='STREAMING UPDATE', operationParameters={'epochId': '1', 'outputMode': 'Append'

In [53]:
df_old = spark.read.format("delta").option("versionAsOf", 1).load(sink_path).toPandas().sort_values(['title'])
df_old['version'] = 1

In [54]:
df_new = spark.read.format("delta").option("versionAsOf", 2).load(sink_path).toPandas().sort_values(['title'])
df_new['version'] = 2

In [55]:
import pandas as pd
dfconcat = pd.concat([df_old, df_new]).drop_duplicates(['title', 'taster_name', 'version'])
compare = dfconcat.set_index(['title', 'taster_name', 'version']).unstack(2)
compare[compare[('points', 1)] != compare[('points', 2)]]

Unnamed: 0_level_0,Unnamed: 1_level_0,country,country,points,points,price,price,province,province,region_1,region_1,region_2,region_2,taster_twitter_handle,taster_twitter_handle,variety,variety,winery,winery
Unnamed: 0_level_1,version,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2,1,2
title,taster_name,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2
Aigner 2015 Weinzierlberg Riesling (Kremstal),Anne Krebiehl MW,Austria,Austria,92.0,44.0,,,Kremstal,Kremstal,,,,,@AnneInVino,@AnneInVino,Riesling,Riesling,Aigner,Aigner
Albert Bichot 2010 Bourgogne,Roger Voss,France,France,85.0,1.0,16.0,16.0,Burgundy,Burgundy,Bourgogne,Bourgogne,,,@vossroger,@vossroger,Chardonnay,Chardonnay,Albert Bichot,Albert Bichot
Alma del Sur 2011 Colección Malbec (Mendoza),Michael Schachner,Argentina,Argentina,85.0,58.0,10.0,10.0,Mendoza Province,Mendoza Province,Mendoza,Mendoza,,,@wineschach,@wineschach,Malbec,Malbec,Alma del Sur,Alma del Sur
Alta Alella 2014 Reserva Brut Nature Sparkling (Cava),Michael Schachner,Spain,Spain,88.0,16.0,22.0,22.0,Catalonia,Catalonia,Cava,Cava,,,@wineschach,@wineschach,Sparkling Blend,Sparkling Blend,Alta Alella,Alta Alella
Anam Cara 2014 Nicholas Estate Chardonnay (Chehalem Mountains),Paul Gregutt,US,US,87.0,81.0,22.0,22.0,Oregon,Oregon,Chehalem Mountains,Chehalem Mountains,Willamette Valley,Willamette Valley,@paulgwine,@paulgwine,Chardonnay,Chardonnay,Anam Cara,Anam Cara
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
Vigne Surrau 2014 Sciala Superiore (Vermentino di Gallura),Kerin O’Keefe,Italy,Italy,93.0,57.0,23.0,23.0,Sicily & Sardinia,Sicily & Sardinia,Vermentino di Gallura,Vermentino di Gallura,,,@kerinokeefe,@kerinokeefe,Vermentino,Vermentino,Vigne Surrau,Vigne Surrau
Villa Sandi 2015 Cartizze Vigna La Rivetta (Valdobbiadene Superiore di Cartizze),Kerin O’Keefe,Italy,Italy,91.0,26.0,45.0,45.0,Veneto,Veneto,Valdobbiadene Superiore di Cartizze,Valdobbiadene Superiore di Cartizze,,,@kerinokeefe,@kerinokeefe,Glera,Glera,Villa Sandi,Villa Sandi
Viña Cobos 2015 Bramare Marchiori Vineyard Chardonnay (Perdriel),Michael Schachner,,Argentina,,21.0,,46.0,,Mendoza Province,,Perdriel,,,,@wineschach,,Chardonnay,,Viña Cobos
Willm 2015 Réserve Gewurztraminer (Alsace),Anne Krebiehl MW,France,France,88.0,35.0,15.0,15.0,Alsace,Alsace,Alsace,Alsace,,,@AnneInVino,@AnneInVino,Gewürztraminer,Gewürztraminer,Willm,Willm


In [56]:
title = 'Willm 2015 Réserve Gewurztraminer (Alsace)'
taster_name = 'Anne Krebiehl\xa0MW'

In [57]:
df_old[(df_old.taster_name==taster_name) & (df_old.title==title)]
#df_old[df_old.title==title].taster_name.values

Unnamed: 0,country,points,price,province,region_1,region_2,taster_name,taster_twitter_handle,title,variety,winery,version
15613,France,88,15.0,Alsace,Alsace,,Anne Krebiehl MW,@AnneInVino,Willm 2015 Réserve Gewurztraminer (Alsace),Gewürztraminer,Willm,1


In [58]:
df_new[(df_new.taster_name==taster_name) & (df_new.title==title)]

Unnamed: 0,country,points,price,province,region_1,region_2,taster_name,taster_twitter_handle,title,variety,winery,version
90164,France,35,15.0,Alsace,Alsace,,Anne Krebiehl MW,@AnneInVino,Willm 2015 Réserve Gewurztraminer (Alsace),Gewürztraminer,Willm,2


In [59]:
dfconcat[(dfconcat.taster_name==taster_name) & (dfconcat.title==title)]

Unnamed: 0,country,points,price,province,region_1,region_2,taster_name,taster_twitter_handle,title,variety,winery,version
15613,France,88,15.0,Alsace,Alsace,,Anne Krebiehl MW,@AnneInVino,Willm 2015 Réserve Gewurztraminer (Alsace),Gewürztraminer,Willm,1
90164,France,35,15.0,Alsace,Alsace,,Anne Krebiehl MW,@AnneInVino,Willm 2015 Réserve Gewurztraminer (Alsace),Gewürztraminer,Willm,2


# Without Spark

As mentioned it is possible now to read delta files in python without using any spark cluster or pyspark code. This is shown below

Writing is currently not possible yet (11-12-2021), for notes and docs see here: https://github.com/delta-io/delta-rs

In [61]:
from deltalake import DeltaTable
dt = DeltaTable(sink_path)
dt.version()
dt.load_version(0)

In [62]:
df = dt.to_pyarrow_table().to_pandas()
df

Unnamed: 0,country,points,price,province,region_1,region_2,taster_name,taster_twitter_handle,title,variety,winery
0,Italy,88,41.0,Sicily & Sardinia,Etna,,,,Benanti 2005 Il Monovitigno (Etna),Nerello Mascalese,Benanti
1,US,88,60.0,Oregon,Willamette Valley,Willamette Valley,Paul Gregutt,@paulgwine,Benton-Lane 2006 First Class Pinot Noir (Willa...,Pinot Noir,Benton-Lane
2,France,88,26.0,Southwest France,Montravel,,Roger Voss,@vossroger,Château Puy-Servain 2003 Vieilles Vignes (Mon...,Bordeaux-style Red Blend,Château Puy-Servain
3,France,88,25.0,Southwest France,Montravel,,Roger Voss,@vossroger,Château Puy-Servain 2007 Marjolaine (Montravel),Bordeaux-style White Blend,Château Puy-Servain
4,France,88,16.0,Loire Valley,Quincy,,Roger Voss,@vossroger,Domaine Philippe Portier 2007 Quincy,Sauvignon Blanc,Domaine Philippe Portier
...,...,...,...,...,...,...,...,...,...,...,...
117964,Germany,90,28.0,Mosel,,,Anna Lee C. Iijima,,Dr. H. Thanisch (Erben Müller-Burggraef) 2013 ...,Riesling,Dr. H. Thanisch (Erben Müller-Burggraef)
117965,US,90,75.0,Oregon,Oregon,Oregon Other,Paul Gregutt,@paulgwine,Citation 2004 Pinot Noir (Oregon),Pinot Noir,Citation
117966,France,90,30.0,Alsace,Alsace,,Roger Voss,@vossroger,Domaine Gresser 2013 Kritt Gewurztraminer (Als...,Gewürztraminer,Domaine Gresser
117967,France,90,32.0,Alsace,Alsace,,Roger Voss,@vossroger,Domaine Marcel Deiss 2012 Pinot Gris (Alsace),Pinot Gris,Domaine Marcel Deiss
