## 1. Data Ingestion

![picture](img/picture8.png)

#### Content
* Session Builder
* Structure
* Uploading
* Reading and creating table in Database of interest

#### Objective
* To show users how to upload and create a table in the database

 The notebook provides a sparkmagic kernel that sends code/sql queries to the lighter server. The lighter server communicates with the spark server which sends back the results through the lighter server.

This line of code connects to the lighter server through the sparkmagic kernel.

```
%_do_not_call_change_endpoint --username  --password  --server https://lighter-staging.vbrani.aisingapore.net/lighter/api  
```

![picture](img/picture4.png)


This code specify the configs for the spark session. 

```
%%configure -f
{"conf": {
        "spark.sql.warehouse.dir" : "s3a://dataops-example/justin",
        "spark.hadoop.fs.s3a.access.key":"",
        "spark.hadoop.fs.s3a.secret.key": "",
        "spark.kubernetes.container.image": "justinljg/dep:1.08",
        "spark.kubernetes.container.image.pullPolicy" : "Always"
    }
}
```

spark.sql.warehouse.dir specifies the default location of database in warehouse.

spark.hadoop.fs.s3a.access.key & spark.hadoop.fs.s3a.secret.key is the aws credential key.

spark.kubernetes.container.image is the docker image that contains your dependencies.

spark.kubernetes.container.image.pullPolicy is the option to always pull your docker image.


In [1]:
%_do_not_call_change_endpoint --username --password  --server https://lighter-staging.vbrani.aisingapore.net/lighter/api  

In [None]:
%%configure -f
{"conf": {
        "spark.sql.warehouse.dir" : "s3a://dataops-example/nlp",
        "spark.hadoop.fs.s3a.access.key":"",
        "spark.hadoop.fs.s3a.secret.key": "",
        "spark.kubernetes.container.image": "justinljg/dep:1.08",
        "spark.kubernetes.container.image.pullPolicy" : "Always"
    }
}

In [None]:
from delta import *

### Structure

##### MultiHop Architecture
The multi-hop architecture is a common method used in data engineering that involves organizing data into tables based on their quality level in the data pipeline. The three levels of tables are the Bronze tables, which contain raw, unstructured data; the Silver tables, which have been transformed and engineered to include more structure; and the Gold tables, which are used for machine learning training and prediction.

The multi-hop architecture provides a structured approach to data processing and enables data engineers to build a pipeline that starts with raw data as a "single source of truth" and adds structure at each stage. This allows for recalculations and validations of subsequent transformations and aggregations to ensure that business-level aggregate tables reflect the underlying data, even as downstream users refine the data and introduce context-specific structure.

![picture](img/picture1.jpg)

![picture](img/picture3.png)

For more information, please refer to [here](https://www.databricks.com/blog/2019/08/14/productionizing-machine-learning-with-delta-lake.html#:%5C~:text=A%20common%20architecture%20uses%20tables,(%E2%80%9CGold%E2%80%9D%20tables)).

##### Tables and views

This notebook will also follow a practice of creating views for processing and manipulating it before saving it as a bronze/silver/gold table.

### Reading and creating table in Database of interest

After uploading the data, the company's ingestion pipeline pandan reservoir will create a delta file. Read the delta file, create a view and create a table using the view. The purpose of using a new view is because it does not change the tables, it does not use the memory as a view, it is temporary and with the multihop architecture, it is easy to trace the errors when it occurs. 

AI Singapore has an ingestion data platform at https://udp.aisingapore.net/. 

Log in through google and load your data through the UI. 

![picture](img/picture15.png)
![picture](img/picture2.png)

The advantages of using this UI is because it converts the file into a delta format for the users to access directly, it is quite a fuss free process. 

This UI uses Pandan Reservoir service (or Delta Service) running on Kubernetes that initiates PySpark workloads for processing uploaded datasets. It reads the raw data from the specified source location and writes its .delta equivalent in the specified location.

The overarching idea here is that the listener container will always be listening on a specific Kafka Topic. Upon receiving a message, it will pass that message as an environmental variable to the Worker container and runs the container via the Kubernetes Python API.
Currently, PandanReservoir is deployed automatically through GitLab CICD.

## Reading and creating table in Database of interest

After uploading the data, the company's ingestion pipeline pandan reservoir will create a delta file. Read the delta file, create a view and create a table using the view. The purpose of using a new view is because it does not change the tables, it does not use the memory as a view, it is temporary and with the multihop architecture, it is easy to trace the errors when it occurs. 

<br>This code reads and loads the delta file into a spark.sql.DataFrame. 

[Documentation](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html)

```
df_ingest = spark.read \
    .format("delta") \
    .load("s3a://udptesting1/delta/csv/greview")
```

In the case where you do not have the access to udptesting, the example data is availble in the cell below.

In [4]:
df_ingest = spark.read \
    .format("delta") \
    .load("s3a://dataops-example/nlp/delta/sparknlp")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

This line executes to navigate to the database you want.

[Documentation](https://docs.databricks.com/archive/spark-sql-2.x-language-manual/use-database.html)
```
%%sql

USE SparkNLP;
```

In [5]:
%%sql

USE SparkNLP;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

This line creates a view.

[Documentation](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.createOrReplaceTempView.html)
```
df_ingest.createOrReplaceTempView("greview_view_bronze)
```

In [6]:
df_ingest.createOrReplaceTempView("greview_view_bronze")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

This line selects everything from the view.

[Documentation](https://docs.databricks.com/archive/spark-sql-2.x-language-manual/select.html)
```
%%sql

SELECT * FROM greview_view_bronze;
```

In [7]:
%%sql

SELECT * FROM greview_view_bronze;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

This line creates a table if it does not exist by selecting everything from the bronze view.

[Documentation](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-create-table-using.html)
```
%%sql

CREATE TABLE IF NOT EXISTS greview_table_bronze
AS SELECT * FROM greview_view_bronze;
```

In [8]:
%%sql

CREATE TABLE IF NOT EXISTS greview_table_bronze
AS SELECT * FROM greview_view_bronze;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

This line shows the tables.

[Documentation](https://docs.databricks.com/archive/spark-sql-2.x-language-manual/show-tables.html)
```
%%sql

SHOW TABLES;
```

In [9]:
%%sql

SHOW TABLES;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [10]:
df = spark.read.table("greview_table_bronze")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
df.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-------------------+--------------------+-------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+-------------------------------------------------------------------------------------+-------------------------------------+----+----+----+----+----+----+----+
|index|userid             |name                |time         |rating|text                                                                                                                                                                                                                                                                                                                            |pics|resp                                              

In [12]:
df.write.mode("overwrite")\
    .format("delta")\
    .save("s3a://dataops-example/nlp/delta/greview_table_bronze")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…