<a href="https://colab.research.google.com/github/ChetanKnowIt/BDT_Notes/blob/main/HBase_Pyspark_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<hr />
<h1 align="center" > PySpark⭐ HBase🐬 connectivity </h1>

<hr />

## Requirements/Prerequisites: 

  1. Fully Distributed mode installation (Multinode Hadoop environment + HBase installation) from here [Guru99 installtion link](https://www.guru99.com/hbase-installation-guide.html) 
  2. pip install pyspark 
  3. pip install happybase
  4. pip install Flask

<hr />







In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=df0fda8b25a6f7c2a09bdcee18495e6a18dbb9a1caa305e20d5176bdd482a415
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [3]:
pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
import findspark
findspark.init()

In [5]:
!which python3


/usr/bin/python3


In [6]:
import sys
print(sys.executable)


/usr/bin/python3


In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myAppName') \
    .config('spark.executorEnv.PYTHON_EXECUTABLE', '/usr/bin/python3') \
    .getOrCreate()
sc = spark.sparkContext


In [16]:
print(sc.getConf().get('spark.executorEnv.PYTHON_EXECUTABLE'))

None


In [17]:
python_exec = sc.getConf().get('spark.executorEnv.PYTHON_EXECUTABLE')
print(python_exec)

None


In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("my_app") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()


## Flow: 


In [None]:
from IPython.display import Image
Image(url="https://drive.google.com/u/0/uc?id=12AWXzV0JAdP0uZeo7lrdt6_v3Po19bL9")

### 1. We can start with reading file from HDFS, processing 

In [None]:
import pyarrow as pa
import pyarrow.csv as csv
import pandas as pd

# Define the HDFS file path
hdfs_path = 'hdfs://my_cluster:8020/my_file.csv'

# Create a PyArrow HDFS filesystem object
fs = pa.hdfs.connect()

# Open the HDFS file as a memory buffer
with fs.open(hdfs_path) as file:
    buffer = pa.BufferReader(file.read())

# Read the CSV data from the memory buffer using PyArrow
table = csv.read_csv(buffer)

# Convert the PyArrow Table to a Pandas DataFrame
data = table.to_pandas()


### and dumping on HBase from pandas 
 https://happybase.readthedocs.io/en/latest/user.html example table.put('2',{'f1': 'hey'})

In [None]:
import happybase
import pandas as pd

# Load data into a Pandas DataFrame
data = pd.read_csv('my_data.csv')

# Create a connection object to HBase
connection = happybase.Connection('localhost', port=9090)

# Create a table object in Happybase
table_name = 'my_table'
column_family = 'my_cf'
table = connection.table(table_name)

# Iterate over the rows in the Pandas DataFrame
for _, row in data.iterrows():
    # Extract data from the row
    row_key = row['my_key_column']
    data_dict = {
        f'{column_family}:{column}': str(row[column])
        for column in data.columns
        if column != 'my_key_column'
    }
    
    # Write the row to HBase
    table.put(row_key, data_dict)


### 2. then we retrieve values with this example below

In [None]:
import happybase
from pyspark.sql import SparkSession

# Create a HBase connection object
connection = happybase.Connection('localhost', port=9090)

# Create a table object
table = connection.table('my_table')

# Retrieve data from HBase table
data = []
for key, values in table.scan():
    row = {}
    row['key'] = key
    for column, value in values.items():
        row[column.decode('utf-8')] = value.decode('utf-8')
    data.append(row)

# Convert HBase data to PySpark DataFrame
spark = SparkSession.builder.appName('HBase to PySpark').getOrCreate()
df = spark.createDataFrame(data)

# Print PySpark DataFrame
df.show()


### 3. we use this spark dataframe for model training and evaluation 
### 4. we dump data back to HBase with this example below

In [None]:
import happybase
import pickle
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession

# Train a logistic regression model in Spark
spark = SparkSession.builder.appName('Logistic Regression').getOrCreate()
data = spark.read.csv('my_data.csv', header=True, inferSchema=True)
assembler = VectorAssembler(inputCols=['feature1', 'feature2'], outputCol='features')
data = assembler.transform(data)
train, test = data.randomSplit([0.7, 0.3])
lr = LogisticRegression()
model = lr.fit(train)
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(predictions)

# Serialize the results
results = {'auc': auc}
serialized_results = pickle.dumps(results)

# Create a connection object to HBase
connection = happybase.Connection('localhost', port=9090)

# Create a table object in Happybase
table_name = 'my_table'
table = connection.table(table_name)

# Write the results to HBase
row_key = 'my_row'
column_family = 'results'
column_qualifier = 'logistic_regression'
table.put(row_key, {f'{column_family}:{column_qualifier}': serialized_results})


### 5. Finally we can use Flask to render this table onto an HTML for the web

In [None]:
import happybase
from flask import Flask, render_template

app = Flask(__name__)

@app.route('/')
def index():
    # Create a connection object to HBase
    connection = happybase.Connection('localhost', port=9090)

    # Create a table object in Happybase
    table_name = 'my_table'
    column_family = 'my_cf'
    table = connection.table(table_name)

    # Execute a scan operation to read data from HBase
    scan_results = table.scan()

    # Convert the scan results to a list of dictionaries
    data = []
    for key, values in scan_results:
        row_dict = {f'{column_family}:{column}': value.decode('utf-8') for column, value in values.items()}
        row_dict['row_key'] = key.decode('utf-8')
        data.append(row_dict)

    # Render the results in an HTML table using Jinja templates
    return render_template('index.html', data=data)

if __name__ == '__main__':
    app.run(debug=True)


<!doctype html>
<html>
  <head>
    <title>HBase Data</title>
  </head>
  <body>
    <h1>HBase Data</h1>
    <table>
      <thead>
        <tr>
          <th>Row Key</th>
          <th>Column 1</th>
          <th>Column 2</th>
          <th>Column 3</th>
        </tr>
      </thead>
      <tbody>
        {% for row in data %}
          <tr>
            <td>{{ row['row_key'] }}</td>
            <td>{{ row['my_cf:column1'] }}</td>
            <td>{{ row['my_cf:column2'] }}</td>
            <td>{{ row['my_cf:column3'] }}</td>
          </tr>
        {% endfor %}
      </tbody>
    </table>
  </body>
</html>
