## Part 1: Connection to LakeFS

In [1]:
# pip install 'lakefs_client==<lakeFS version>' I am using version 0.93.0
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient

In [7]:
# https://docs.lakefs.io/integrations/python.html#python-client-documentation
endpoint = 'http://127.0.0.1:8000/api/v1'
accessKey = 'AKIAJZYUWR6DJFEX3CXQ'
secretKey = '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'

# Create a LakeFS client using LakeFS python API
# Cannot interface with the client without running LakeFS on a Docker container
configuration = lakefs_client.Configuration()
configuration.username = accessKey
configuration.password = secretKey
configuration.host = endpoint

client = LakeFSClient(configuration)

### *Interacting with LakeFS*
*These are just examples, do not run*

https://pydocs.lakefs.io/

In [3]:
repo = models.RepositoryCreation(name='my-repo', storage_namespace='s3://cisc499', default_branch='main')
client.repositories.create_repository(repo)
# output:
# {'creation_date': 1617532175,
#  'default_branch': 'main',
#  'id': 'example-repo',
#  'storage_namespace': 's3://storage-bucket/repos/example-repo'}

{'creation_date': 1681440011,
 'default_branch': 'main',
 'id': 'my-repo',
 'storage_namespace': 's3://cisc499'}

In [4]:
client.branches.list_branches('my-repo')

{'pagination': {'has_more': False,
                'max_per_page': 1000,
                'next_offset': '',
                'results': 1},
 'results': [{'commit_id': 'fcd9721dccebe0b97f149bf2d94f65f6b4537c4739310df4f9d5d0b70003aa19',
              'id': 'main'}]}

In [None]:
# Creating a new repo
repo = models.RepositoryCreation(name='example-repo', storage_namespace='local://example-repo', default_branch='main')
client.repositories.create_repository(repo)

# Creating a new branch
client.branches.create_branch(repository='my-repo', branch_creation=models.BranchCreation(name='new-branch', source='main'))

# Deleting object
client.objects.delete_object(repository='my-repo', branch='main', path='/Users/funnypiggy/Desktop/MobiAct_Dataset_v2.0/Annotated Data/BSC/BSC_1_1_annotated.csv')

In [10]:
# Uploading file to main branch
filepath = '/Users/funnypiggy/Desktop/MobiAct_Dataset_v2.0/Annotated Data/BSC/BSC_1_1_annotated.csv'
file = 'BSC_1_1_annotated.csv'
with open(filepath, 'rb') as f:
    client.objects.upload_object(repository='my-repo', branch='main', path=file, content=f)

In [11]:
# Commit all uncommitted changes
message = "Added a CSV file"
client.commits.commit(
    repository='my-repo',
    branch='main',
    commit_creation=models.CommitCreation(message=message, metadata={'using': 'python_api'})
)

{'committer': 'admin',
 'creation_date': 1681440862,
 'id': '8a8fe18ac90be19e02d5e3c7907d26256975ac097ea143855a8c4954aec0f4c7',
 'message': 'Added a CSV file',
 'meta_range_id': '',
 'metadata': {'using': 'python_api'},
 'parents': ['fcd9721dccebe0b97f149bf2d94f65f6b4537c4739310df4f9d5d0b70003aa19']}

## Part 2: Querying

Using Apache Spark to manipulate data in LakeFS: https://docs.lakefs.io/integrations/spark.html, https://docs.lakefs.io/reference/spark-client.html

The following link has an example to run a Spark context "local": https://docs.lakefs.io/quickstart/iso_env.html

*Helpful links (I used PySpark rather than Spark with Scala):*

https://sparkbyexamples.com/spark/spark-web-ui-understanding/

https://sparkbyexamples.com/spark/install-apache-spark-on-mac/

In [5]:
from pyspark.sql import SparkSession

In [6]:
# Run a pyspark shell with the LakeFS package in cli using the line below
# Not sure if the numbers at the end, i.e. 0.1.12, has to be updated
# pyspark --packages io.lakefs:hadoop-lakefs-assembly:0.1.12

# Other reference page to spark-client from LakeFS contains other package
# versions, but they cause errors when run:
# https://docs.lakefs.io/reference/spark-client.html

# Not sure if there is a way to add the package above in SparkSession.Builder with .config()
# Possible solution: https://stackoverflow.com/questions/35762459/add-jar-to-standalone-pyspark

# Get or create a SparkSession object
spark = SparkSession.builder.appName('query_test').master('local').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/13 22:47:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/13 22:47:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [9]:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "ACCESSKEYHERE")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "SECRETKEYHERE")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "https://s3.us-east-2.amazonaws.com")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.lakefs.access.key", accessKey)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.lakefs.secret.key", secretKey)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.lakefs.endpoint", endpoint)

In [7]:
# Configure lakeFS Hadoop FileSystem for Spark to read and write objects directly from s3
spark.sparkContext._jsc.hadoopConfiguration().set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem")
spark.sparkContext._jsc.hadoopConfiguration().set("fs.lakefs.access.key", accessKey)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.lakefs.secret.key", secretKey)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.lakefs.endpoint", endpoint)
# Configure spark access to s3 endpoint pointing to LakeFS
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", accessKey)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secretKey)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", endpoint)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")

In [12]:
# Links to possible solutions for this block:
# https://www.edureka.co/community/63721/filestreamsink-illegalargumentexception-unknownhostexception
# https://lakefs.io/blog/databricks-lakefs-integration-tutorial/

# https://sparkbyexamples.com/pyspark/pyspark-sql-with-examples/
repo = 'my-repo'
branch = 'main'
file = 'BSC_1_1_annotated.csv'
lakefs_filepath = f'lakefs://{repo}/{branch}/{file}'

df = spark.read.option('header',True).csv(lakefs_filepath).createOrReplaceTempView('Example')
df.printSchema()
df.show(5)

# df.select('*').show(5)
# OR
# spark.sql('SELECT * FROM Example').show(5)

23/04/13 22:54:49 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: lakefs://my-repo/main/BSC_1_1_annotated.csv.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRel

Py4JJavaError: An error occurred while calling o34.csv.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:752)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:750)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:579)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:537)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: java.lang.ClassNotFoundException: Class io.lakefs.LakeFSFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 27 more
