# Example of running OCR in streaming mode for process PDF's
## Install spark-ocr python packge
Need specify path to `spark-ocr-assembly-[version].jar` or `secret`

In [1]:
secret = ""
license = ""
version = secret.split("-")[0]
spark_ocr_jar_path = "../../target/scala-2.11"

In [None]:
%%bash
if python -c 'import google.colab' &> /dev/null; then
    echo "Run on Google Colab!"
    echo "Install Open JDK"
    apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
    java -version
fi

In [None]:
import sys
import os

if 'google.colab' in sys.modules:
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

In [None]:
# install from PYPI using secret
#%pip install spark-ocr==$version+spark30 --extra-index-url=https://pypi.johnsnowlabs.com/$secret --upgrade

In [None]:
# or install from local path
# %pip install ../../python/dist/spark-ocr-1.9.0.spark24.tar.gz

## Initialization of spark session

In [2]:
from pyspark.sql import SparkSession
from sparkocr import start

if license:
    os.environ['JSL_OCR_LICENSE'] = license

spark = start(secret=secret, jar_path=spark_ocr_jar_path)
spark

SparkConf Configured, Starting to listen on port: 63045
JAR PATH:/usr/local/lib/python3.7/site-packages/sparkmonitor/listener.jar


In [2]:
from pyspark.ml import PipelineModel
from pyspark.sql.functions import *

from sparkocr.transformers import *

In [10]:
# fill path to folder with PDF's here
dataset_path = "data/pdfs/*.pdf"

In [11]:
# read one file for infer schema
pdfs_df = spark.read.format("binaryFile").load(dataset_path).limit(1)

## Define OCR pipeline

In [5]:
# Transform binary to image
pdf_to_image = PdfToImage()
pdf_to_image.setOutputCol("image")

# Run OCR for each region
ocr = ImageToText()
ocr.setInputCol("image")
ocr.setOutputCol("text")
ocr.setConfidenceThreshold(60)

# OCR pipeline
pipeline = PipelineModel(stages=[
    pdf_to_image,
    ocr
])

## Define streaming pipeline and start it
Note: each start erase previous results

In [13]:
# count of files in one microbatch
maxFilesPerTrigger = 4 

# read files as stream
pdf_stream_df = spark.readStream \
.format("binaryFile") \
.schema(pdfs_df.schema) \
.option("maxFilesPerTrigger", maxFilesPerTrigger) \
.load(dataset_path)

# process files using OCR pipoeline
result = pipeline.transform(pdf_stream_df).withColumn("timestamp", current_timestamp())

# store results to memory table
query = result.writeStream \
 .format('memory') \
 .queryName('result') \
 .start()

In [14]:
# get progress of streamig job
query.lastProgress

{'id': 'a8ed6379-ea1f-42ce-bc7f-8a14aa143e9e',
 'runId': '409d9513-36d3-487c-bd37-4438c0c039d7',
 'name': 'result',
 'timestamp': '2020-02-06T12:51:41.013Z',
 'batchId': 1,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'getOffset': 1, 'triggerExecution': 1},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[file:/Users/nmelnik/IdeaProjects/spark-ocr/workshop/jupyter/data/pdfs/*.pdf]',
   'startOffset': {'logOffset': 0},
   'endOffset': {'logOffset': 0},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'MemorySink'}}

In [12]:
# need to run for stop steraming job
query.stop()

## Show results from 'result' table

In [15]:
# count of processed records (number of processed pages in results)
spark.table("result").count() 

1

In [16]:
# show results
spark.table("result").select("timestamp","pagenum", "path", "text").show(10)

+--------------------+-------+--------------------+--------------------+
|           timestamp|pagenum|                path|                text|
+--------------------+-------+--------------------+--------------------+
|2020-02-06 19:51:...|      0|file:/Users/nmeln...| 

~ (OLD GOLD ST...|
+--------------------+-------+--------------------+--------------------+



## Run streaming job for storing results to disk

In [18]:
query = result.select("text").writeStream \
 .format('text') \
 .option("path", "results/") \
 .option("checkpointLocation", "checkpointDir") \
 .start()

In [19]:
# get progress of streamig job
query.lastProgress

{'id': '61a29c5f-ac1c-498a-a7c3-2744960cbe98',
 'runId': 'c891f0d3-c34d-44ad-a608-81cc448a0613',
 'name': None,
 'timestamp': '2020-02-06T12:52:55.074Z',
 'batchId': 1,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'getOffset': 1, 'triggerExecution': 1},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[file:/Users/nmelnik/IdeaProjects/spark-ocr/workshop/jupyter/data/pdfs/*.pdf]',
   'startOffset': {'logOffset': 0},
   'endOffset': {'logOffset': 0},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'FileSink[results/]'}}

In [20]:
# need to run for stop steraming job
query.stop()

## Read results from disk

In [21]:
# NBVAL_SKIP
results = spark.read.format("text").load("results/*.txt")
results.show()

+--------------------+
|               value|
+--------------------+
|                    |
|                    |
|~ (OLD GOLD STRAI...|
|                    |
|control for Sampl...|
|                    |
|/. Length -------...|
|Circumference~-~ ...|
|Paper --------+--...|
|Firmness == aa me...|
|Draw -------~----...|
|Weight on @ ae ee...|
|Tipping Paper -- ...|
|. : Labels ---- O...|
|   Print---~--------|
|C . Filter Length...|
|  . Tear Tape-- Gold|
|. Cartons --- OLD...|
|>=, Requirements:...|
|Laboratory ----- ...|
+--------------------+
only showing top 20 rows



## Clean results and checkpoint folders

In [29]:
%%bash
rm -r -f results
rm -r -f checkpointDir