# Helium and Delta Lake

The Helium Foundation will soon publish all oracle S3 data files in the Delta Lake and Parquet format. Delta Lake, Parquet and Spark are modern standards for accessing and querying massive data tables. Delta Lake is easy to use after you've installed the right tools. With Spark, Jupyter and Delta Lake you can efficiently query Helium network data using standard SQL commands.

Using Parquet, the Helium Foundation will be able to provide data which is 30 to 50% smaller than the equivalent raw data in protobuf format. Additionally Parquet data has integrated schemas which make parsing and querying the data much simpler. Parquet is the data format of choice by data scientists.

We hope the community enjoys these new data tools and look forward to collaborating and integrating future community contributions.

The following Jupyter notebook shows how to access Helium Foundations's S3 files and SQL query the S3 files using Spark DataFrames.

## Jupyter Setup Instructions
From a Jupyter Terminal install delta-spark and create the AWS credentials file

```
pip install delta-spark
pip3 install boto3
mkdir /home/jovyan/.aws
# Set your access_key, secret_access_key and session_token
vi /home/jovyan/.aws/credentials
```

## Verify your AWS connection

Verify your account credentials are working and Jupyter can connect to AWS

> **Warning**
> AWS queries carry the risk of high S3 egress fees when the data is transfered outside of us-west-2.  
> As a matter of practice, it is recommended to keep the following in mind:
> * Specify a limited date range on all queries.
> * Repeated queries will incur additional cost.

In [1]:
import boto3

session = boto3.Session(profile_name='default')
s3 = session.client('s3')

#s3.get_bucket_location(Bucket='foundation-data-lake-requester-pays', RequestPayer='requester')
#s3.get_bucket_location(Bucket='foundation-data-lake-requester-pays')

for key in s3.list_objects(Bucket='foundation-data-lake-requester-pays', Prefix='bronze/lora_poc_v1/_delta_log/0000000000000000000', RequestPayer='requester')['Contents']:
    print(key['Key'])

bronze/lora_poc_v1/_delta_log/00000000000000000000.json
bronze/lora_poc_v1/_delta_log/00000000000000000001.json
bronze/lora_poc_v1/_delta_log/00000000000000000002.json
bronze/lora_poc_v1/_delta_log/00000000000000000003.json
bronze/lora_poc_v1/_delta_log/00000000000000000004.json
bronze/lora_poc_v1/_delta_log/00000000000000000005.json
bronze/lora_poc_v1/_delta_log/00000000000000000006.json
bronze/lora_poc_v1/_delta_log/00000000000000000007.json
bronze/lora_poc_v1/_delta_log/00000000000000000008.json
bronze/lora_poc_v1/_delta_log/00000000000000000009.json


## Start Spark Session

### Known Issues

S3 requires requester pays headers which is not supported until Spark 3.4.2 / Hadoop 3.3.5.

https://github.com/jupyter/docker-stacks/issues/1937

In [8]:
import pyspark
from pyspark.sql import SparkSession
from delta import *

builder = SparkSession.builder.master("local[*]") \
    .appName("PySparkLocal") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

my_packages = ["org.apache.hadoop:hadoop-aws:3.3.4"]
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.conf.set("spark.hadoop.fs.s3a.requester-pays.enabled", "true")
spark.conf.set("spark.hadoop.delta.enableFastS3AListFrom", "true")

# Spark 3.4.2 or higher is required
print(f"Spark version = {spark.version}")

# hadoop 3.3.6 or higher is required to support S3 requester pays
print(f"Hadoop version = {spark._sc._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")

Spark version = 3.4.1
Hadoop version = 3.3.4


## Read Table from Delta Lake

In [4]:
delta_table_uri="s3a://foundation-data-lake-requester-pays/bronze/packet_router_packet_report_v1/"
df = spark.read.format("delta").load(delta_table_uri).createOrReplaceTempView("packets")
sqlDF = spark.sql("SELECT * FROM packets WHERE date = '2023-6-1'")
sqlDF.show()

+------------+---+------+----+---------+-----+--------+------+--------------------+--------------------+------------+-----+------+------------------+----------+--------------------+
|gateway_tmst|oui|net_id|rssi|frequency|  snr|datarate|region|             gateway|        payload_hash|payload_size| free|  type|received_timestamp|      date|                file|
+------------+---+------+----+---------+-----+--------+------+--------------------+--------------------+------------+-----+------+------------------+----------+--------------------+
|   658952164|  1|    36|-101|865402500|  0.8|        | IN865|[00 E7 36 B0 62 B...|[AC 47 D5 30 D3 E...|          23|false|      |     1685577861837|2023-06-01|s3://foundation-p...|
|  1606199439| 12|    36|-125|905300000|-12.8|SF9BW125|      |[00 E2 70 E6 BC 3...|[AB 37 01 00 F7 7...|          24|false|uplink|     1685578035208|2023-06-01|s3://foundation-p...|
|  2993796500| 12|    36|-140|868500000|-22.0|        | EU868|[00 BC 08 55 20 C...|[82 AF 

## Query Table using SQL

In [6]:
spark.sql("""\
SELECT payload_size
FROM packets
where 
      date = '2023-06-01'
limit 5
""").show(truncate=False)

+------------+
|payload_size|
+------------+
|23          |
|24          |
|23          |
|23          |
|23          |
+------------+



In [7]:
sqlDF.printSchema()

root
 |-- gateway_tmst: decimal(23,0) (nullable = true)
 |-- oui: decimal(23,0) (nullable = true)
 |-- net_id: long (nullable = true)
 |-- rssi: integer (nullable = true)
 |-- frequency: long (nullable = true)
 |-- snr: float (nullable = true)
 |-- datarate: string (nullable = true)
 |-- region: string (nullable = true)
 |-- gateway: binary (nullable = true)
 |-- payload_hash: binary (nullable = true)
 |-- payload_size: long (nullable = true)
 |-- free: boolean (nullable = true)
 |-- type: string (nullable = true)
 |-- received_timestamp: decimal(23,0) (nullable = true)
 |-- date: date (nullable = false)
 |-- file: string (nullable = true)



## Working with Local Files

Working with S3 can incur egress costs.
If you wish to work with a local filesytem (rather than S3).  Install the AWS Cli.
Next Sync the Helium S3 files to the local file system under ../work/s3

```
aws s3 sync s3://foundation-data-lake-requester-pays/bronze/packet_router_packet_report_v1/ . --exclude "date=1970-01-01/"
```

In [None]:
df_local = spark.read.format("delta").load("/home/jovyan/work/s3")
df_local.show()

## References

### Delta Lake

* https://delta.io/
* https://sparkbyexamples.com/pyspark/pyspark-read-and-write-parquet-file/?expand_article=1
* https://confluence.technologynursery.org/display/TDA/Jupyter+Notebook+for+Playing+with+Spark%2C+Python%2C+R%2C+Scala
* https://jupyter-docker-stacks.readthedocs.io/en/latest/using/selecting.html#jupyter-pyspark-notebook

### Spark
* https://nbviewer.org/github/almond-sh/examples/blob/master/notebooks/spark.ipynb