In [1]:
from datetime import date
import requests
import pandas as pd
from pyspark.sql import SparkSession
from bs4 import BeautifulSoup, NavigableString, Tag

URL = "https://thepurplepigchicago.com/drink"
page = requests.get(URL)

soup = BeautifulSoup(page.content, "html.parser")
menu = soup.find_all("div", class_="menu-section")



In [119]:
def get_wines_df():
    sections = []
    items = []
    staging = []
    def clean_extra(extra):
        if extra == None:
            return ''
        else:
            return ' '.join(x.strip() for x in extra.replace('\n','').split('/'))

    for i in menu:
        sections.append((i.find('div',{'class':'menu-section-title'}).text, \
                             zip([x.text.split('|')[1] if '|' in x.text else x.text for x in i.find_all('div',{'class':'menu-item-title'})], \
                            [x.text.replace('|','') for x in i.find_all("div", class_="menu-item-description") if x != None], \
                            [clean_extra(x.text) for x in i.find_all("div", class_="menu-item-price-bottom")])))



    for i in sections:
        category = i[0]
        for items in i[1]:
            staging.append(["PurplePig"]+[category] + [x for x in items])

    dict_holder = []
    key_list = ['origin','section', 'name', 'description','extra']

    for i in staging:
        dict_from_list = dict(zip(key_list, i))
        dict_holder.append(dict_from_list)
    return(pd.DataFrame(dict_holder).apply(lambda x: x.str.strip() if x.dtype == "object" else x))
    

In [142]:
df = get_wines_df()

In [143]:
df['insert_date'] = pd.to_datetime('today').normalize()

In [148]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 544 entries, 0 to 543
Data columns (total 6 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   origin       544 non-null    object        
 1   section      544 non-null    object        
 2   name         544 non-null    object        
 3   description  544 non-null    object        
 4   extra        544 non-null    object        
 5   insert_date  544 non-null    datetime64[ns]
dtypes: datetime64[ns](1), object(5)
memory usage: 25.6+ KB


In [153]:
df['insert_date'].head()

TypeError: 'Column' object is not callable

In [150]:
df.to_parquet('Wines.parquet')

In [112]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [151]:
df = spark.read.parquet('Wines.parquet')

In [152]:
df.show()#printSchema()

+---------+--------------------+--------------------+--------------------+--------------------+-------------------+
|   origin|             section|                name|         description|               extra|        insert_date|
+---------+--------------------+--------------------+--------------------+--------------------+-------------------+
|PurplePig|SOMMELIER WINE SP...|     SAUVIGNON BLANC|Patrick Noël, San...|         $19 $45 $86|2022-06-09 19:00:00|
|PurplePig|WHISKEY OF THE MONTH|NELSON'S "GREEN B...|                  14|Nose: Full and wa...|2022-06-09 19:00:00|
|PurplePig|SPARKLING BY THE ...|CHAMPAGNE (CHARDO...|Drappier, Carte d...|        $22 $53 $101|2022-06-09 19:00:00|
|PurplePig|SPARKLING BY THE ...|           LAMBRUSCO|Carra di Casatico...|         $13 $32 $61|2022-06-09 19:00:00|
|PurplePig|SPARKLING BY THE ...|           XINOMAVRO|Kir-Yianni  Akaki...|         $16 $39 $74|2022-06-09 19:00:00|
|PurplePig|SPARKLING BY THE ...|    PROSECCO (GLERA)|Pitars, Brut, Fri..

In [127]:
df.write.format('delta').save('tmp/delta/')

In [128]:
spark.sql("CREATE TABLE wines USING DELTA LOCATION './tmp/delta'")

DataFrame[]

In [129]:
spark.sql("SELECT * FROM wines WHERE name like '%BLANC%' LIMIT 10;").show()

+---------+--------------------+---------------+--------------------+-----------+--------------------+
|   origin|             section|           name|         description|      extra|         insert_date|
+---------+--------------------+---------------+--------------------+-----------+--------------------+
|PurplePig|SOMMELIER WINE SP...|SAUVIGNON BLANC|Patrick Noël, San...|$19 $45 $86|2022-06-10 09:24:...|
|PurplePig| WHITES BY THE GLASS|SAUVIGNON BLANC|Chateau La Rame, ...|$15 $36 $68|2022-06-10 09:24:...|
+---------+--------------------+---------------+--------------------+-----------+--------------------+



In [130]:
OUTPUT_DELTA_PATH = './tmp/delta/'

spark.sql('CREATE DATABASE IF NOT EXISTS WINES')

spark.sql('''
    CREATE TABLE IF NOT EXISTS WINES.PURPLE_PIG(
        origin string
        , section string
        , name string
        , description string
        , extra string
        , date date
    ) USING DELTA
    LOCATION "{0}"
    '''.format(OUTPUT_DELTA_PATH)
)

AnalysisException: The specified schema does not match the existing schema at tmp/delta.

== Specified ==
root
-- origin: string (nullable = true)
-- section: string (nullable = true)
-- name: string (nullable = true)
-- description: string (nullable = true)
-- extra: string (nullable = true)
-- date: date (nullable = true)


== Existing ==
root
-- origin: string (nullable = true)
-- section: string (nullable = true)
-- name: string (nullable = true)
-- description: string (nullable = true)
-- extra: string (nullable = true)
-- insert_date: timestamp (nullable = true)


== Differences==
- Specified schema is missing field(s): insert_date
- Specified schema has additional field(s): date

If your intention is to keep the existing schema, you can omit the
schema from the create table command. Otherwise please ensure that
the schema matches.
        ;

In [67]:
spark.sql('SELECT split(extra,\' \') FROM WINES.PURPLE_PIG').show()

+--------------------+
| split(extra,  , -1)|
+--------------------+
|     [$19, $45, $86]|
|[Nose:, Full, and...|
|    [$22, $53, $101]|
|     [$13, $32, $61]|
|     [$16, $39, $74]|
|     [$15, $36, $68]|
|     [$15, $36, $68]|
|     [$15, $36, $68]|
|     [$16, $39, $74]|
|     [$14, $33, $63]|
|     [$14, $33, $63]|
|     [$13, $32, $61]|
|     [$11, $27, $51]|
|     [$14, $33, $63]|
|     [$15, $36, $68]|
|     [$13, $32, $61]|
|     [$14, $33, $63]|
|     [$15, $36, $68]|
|     [$16, $39, $74]|
|     [$14, $33, $63]|
+--------------------+
only showing top 20 rows



In [61]:
load_file = spark.read.parquet('Wines.parquet')

In [62]:
load_file.createOrReplaceTempView('wine_load')

In [91]:
spark.sql("""SELECT count(1)
FROM WINES.PURPLE_PIG""").show()

Py4JJavaError: An error occurred while calling o233.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition, true, [id=#1344]
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#3631L])
   +- *(1) ColumnarToRow
      +- FileScan parquet wines.purple_pig[] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeLogFileIndex[file:/home/gary/WebScrapingIntoDelta/tmp/delta], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
	at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:162)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:434)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at jdk.internal.reflect.GeneratedMethodAccessor145.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.base/java.lang.Thread.run(Thread.java:829)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:116)
	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1476)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:232)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:399)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:390)
	at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:485)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:519)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
	at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:196)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:162)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:106)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:106)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:139)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:137)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:154)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	... 44 more


In [43]:
spark.sql("""SELECT hash(origin,section, name, description)
FROM wine_load""").show()

+----------------------------------------+
|hash(origin, section, name, description)|
+----------------------------------------+
|                              2061198233|
|                              1527710402|
|                              -632903228|
|                              1261612108|
|                                 2099280|
|                             -2124324733|
|                             -1568785439|
|                             -1899088557|
|                             -1618643588|
|                             -1993399701|
|                              -515037650|
|                               -21673382|
|                             -1172576607|
|                               128985245|
|                             -1285102648|
|                              1795557171|
|                              1636546046|
|                               643601468|
|                             -1081905292|
|                              -444497963|
+----------

In [44]:
spark.sql("""MERGE INTO WINES.PURPLE_PIG
USING wine_load
   ON  hash(WINES.PURPLE_PIG.origin,WINES.PURPLE_PIG.section, WINES.PURPLE_PIG.name, WINES.PURPLE_PIG.description) = \
    hash(wine_load.origin,wine_load.section, wine_load.name, wine_load.description)
 WHEN NOT MATCHED THEN
 	  INSERT (origin, section, name, description) VALUES (origin, section, name, description)
""")

DataFrame[]

In [53]:
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, './tmp/delta')

fullHistoryDF = deltaTable.history() 

fullHistoryDF.show(truncate=False)

+-------+-----------------------+------+--------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|version|timestamp              |userId|userName|operation|operationParameters                                                                                                                                                                                                                                                                     |job |notebook|clusterId|readVersion|

In [58]:
display(spark.catalog.listTables("default"))

[Table(name='wines', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='load_worked_hours', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='wine_load', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [59]:
spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/home/gary/WebScrapingIntoDelta/spark-warehouse'),
 Database(name='wines', description='', locationUri='file:/home/gary/WebScrapingIntoDelta/spark-warehouse/wines.db')]

In [101]:
spark.stop()