# OpenLineage-Spark Demo


Run the following cells and open Marquez at http://127.0.0.1:3000/

### To-dos

- how do you add facets and additional details (e.g., the pyspark code, the source csv filename, etc.?
- how to you set the namespace for the events? While we have `.config('spark.openlineage.namespace', 'spark_integration')` for the code, the datasets show up in the `files` namespace in Marquez. 

In [None]:
!pip install python-dotenv

In [None]:
import os
from dotenv import load_dotenv

load_dotenv() # add this line
MARQUEZ_HOST = os.getenv('MARQUEZ_HOST')
MARQUEZ_PORT = os.getenv('MARQUEZ_PORT')
OL_URL = 'https://{}:{}'.format(MARQUEZ_HOST,MARQUEZ_PORT)
# for whatever reason using the env var is not working THIS notebook, for Spark integration only
# this works on another project; and this OL_URL works using `maraquez_client`. Go figure. 
# For now, manually find the IP of your host machine (ifconfig or ipconfig for Windows) and find your IP
# You should be able to see Jupyter notebook by replacing your "localhost" or "127.0.0.1" with that IP.
OL_URL = "http://192.168.86.39:5000"
print('OpenLineage URL = {}'.format(OL_URL))

In [None]:
from pyspark.sql import SparkSession
import urllib.request

# Set these to your own project and bucket
spark = (SparkSession.builder.master('local').appName('openlineage_spark_demo')
             # Install and set up the OpenLineage listener
             #.config('spark.jars.packages', 'io.openlineage:openlineage-spark:0.3.+')
             .config('spark.jars.packages', 'io.openlineage:openlineage-spark:0.6.0')
             .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
             .config('spark.openlineage.host', OL_URL)
             .config('spark.openlineage.namespace', 'spark_integration')
             .getOrCreate())

# Testing Local Read

In [None]:
# Looks like spark.read doesn't trigger an OpenLineage event? Shouldn't we track reads as well?
df = spark.read.format("csv").option("header",True).load("test.csv")
df.show()

In [None]:
# note that this triggers a OpenLineage event with data source as "Notebooks", as opposed to test.csv
# seem to be a bug

df.groupBy('dependency').count().write.option("header",True).mode("overwrite").csv("dependency_count.csv")


In [None]:
# what if we break this op up
df1 = df.groupBy('dependency').count()
df1.write.option("header",True).mode("overwrite").csv("dependency_count2.csv")


In [None]:
df1.write.option("header",True).mode("overwrite").csv("dependency_count3.csv")


In [None]:
rownum = df.count()
spark.sparkContext.parallelize([rownum]).coalesce(1).saveAsTextFile("rownum.txt")

# Junk code below

In [None]:
type(None)

In [None]:
df.dtypes

In [None]:
df.schema.fields

In [None]:
from pyspark.sql import DataFrame
from pyspark.rdd import RDD

def foo(x):
    if isinstance(x, RDD):
        return "RDD"
    if isinstance(x, DataFrame):
        return "DataFrame"

print(foo(spark.sparkContext.parallelize([])))
## 'RDD'
print(foo(spark.sparkContext.parallelize([("foo", 1)]).toDF()))
## 'DataFrame'

#isinstance(df, "pyspark.sql.dataframe.DataFrame")

In [None]:
!pip install marquez-python

In [None]:
from marquez_client import MarquezClient

client = MarquezClient(url=OL_URL)

# list namespaces
client.list_namespaces()


In [None]:
!pip install openlineage-python

In [None]:
from openlineage.client.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job
from unittest.mock import MagicMock

session = MagicMock()
client = OpenLineageClient(url="http://192.168.86.39:5000", session=None)

client.emit(
    RunEvent(
        RunState.START,
        "2020-01-01",
        Run("69f4acab-b87d-4fc0-b27b-8ea950370ff3"),
        Job("openlineage", "job"),
        "producer"
    )
)

In [None]:
session.post.assert_called_with(
    "http://marquez-api2:5000/api/v1/lineage",
    '{"eventTime": "2020-01-01", "eventType": "START", "inputs": [], "job": '
    '{"facets": {}, "name": "job", "namespace": "openlineage"}, "outputs": [], '
    '"producer": "producer", "run": {"facets": {}, "runId": '
    '"69f4acab-b87d-4fc0-b27b-8ea950370ff3"}}',
    timeout=5.0,
    verify=True
)

In [None]:
from openlineage.client import constants
from openlineage.client.run import RunEvent
from openlineage.client.serde import Serde

import requests
url = 'http://192.168.86.39:5000/api/v1/lineage'
url = OL_URL +"/api/v1/lineage"
headers = {"charset": "utf-8", "Content-Type": "application/json"}


In [None]:
data = """{
        "eventType": "START",
        "eventTime": "2020-12-28T19:52:00.001+10:00",
        "run": {
          "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
        },
        "job": {
          "namespace": "gary-namespace",
          "name": "my-job"
        },
        "inputs": [{
          "namespace": "gary-namespace",
          "name": "my-input"
        }],  
        "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
      }"""
r = requests.post(url, data=data, headers=headers)
r.ok

In [None]:
data2 = """{
        "eventType": "COMPLETE",
        "eventTime": "2020-12-28T20:52:00.001+10:00",
        "run": {
          "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
        },
        "job": {
          "namespace": "gary-namespace",
          "name": "my-job"
        },
        "outputs": [{
          "namespace": "gary-namespace",
          "name": "my-output",
          "facets": {
            "schema": {
              "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
              "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
              "fields": [
                { "name": "a", "type": "VARCHAR"},
                { "name": "b", "type": "VARCHAR"}
              ]
            }
          }
        }],     
        "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
      }"""

r = requests.post(url, data=data2, headers=headers)
r.ok