In [12]:
import findspark
import os
import glob
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SQLContext,SparkSession

In [2]:
sc = SparkSession.builder.appName("Gas_data_ETL").getOrCreate()
sqlc = SQLContext(sc)



In [3]:
data_source_format = 'org.apache.hadoop.hbase.spark'

In [4]:
df = sc.read.option("header",True).csv("GAS DATA")

In [5]:
df.printSchema()

root
 |-- Time (s): string (nullable = true)
 |-- CO (ppm): string (nullable = true)
 |-- Humidity (%r.h.): string (nullable = true)
 |-- Temperature (C): string (nullable = true)
 |-- Flow rate (mL/min): string (nullable = true)
 |-- Heater voltage (V): string (nullable = true)
 |-- R1 (MOhm): string (nullable = true)
 |-- R2 (MOhm): string (nullable = true)
 |-- R3 (MOhm): string (nullable = true)
 |-- R4 (MOhm): string (nullable = true)
 |-- R5 (MOhm): string (nullable = true)
 |-- R6 (MOhm): string (nullable = true)
 |-- R7 (MOhm): string (nullable = true)
 |-- R8 (MOhm): string (nullable = true)
 |-- R9 (MOhm): string (nullable = true)
 |-- R10 (MOhm): string (nullable = true)
 |-- R11 (MOhm): string (nullable = true)
 |-- R12 (MOhm): string (nullable = true)
 |-- R13 (MOhm): string (nullable = true)
 |-- R14 (MOhm): string (nullable = true)



In [6]:
df.show()

+--------+--------+----------------+---------------+------------------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+
|Time (s)|CO (ppm)|Humidity (%r.h.)|Temperature (C)|Flow rate (mL/min)|Heater voltage (V)|R1 (MOhm)|R2 (MOhm)|R3 (MOhm)|R4 (MOhm)|R5 (MOhm)|R6 (MOhm)|R7 (MOhm)|R8 (MOhm)|R9 (MOhm)|R10 (MOhm)|R11 (MOhm)|R12 (MOhm)|R13 (MOhm)|R14 (MOhm)|
+--------+--------+----------------+---------------+------------------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+
|  0.0000|  0.0000|         54.6258|        25.3178|          242.5724|            0.2030|  55.1483|  72.5638|  85.3974|  40.5104|  84.9922|  76.9817|  80.5302|  62.5385|  57.9460|   68.1441|   63.9090|   62.2197|   52.4943|   64.3090|
|  0.3100|  0.0000|         52.6300|        25.3000|    

In [7]:
from pyspark.sql.functions import monotonically_increasing_id

# Assigning  a auto-increment id to the dataset
df2=df.withColumn("id", monotonically_increasing_id())

In [8]:
df2.show(truncate=False)

+--------+--------+----------------+---------------+------------------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+---+
|Time (s)|CO (ppm)|Humidity (%r.h.)|Temperature (C)|Flow rate (mL/min)|Heater voltage (V)|R1 (MOhm)|R2 (MOhm)|R3 (MOhm)|R4 (MOhm)|R5 (MOhm)|R6 (MOhm)|R7 (MOhm)|R8 (MOhm)|R9 (MOhm)|R10 (MOhm)|R11 (MOhm)|R12 (MOhm)|R13 (MOhm)|R14 (MOhm)|id |
+--------+--------+----------------+---------------+------------------+------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+----------+----------+----------+----------+----------+---+
|0.0000  |0.0000  |54.6258         |25.3178        |242.5724          |0.2030            |55.1483  |72.5638  |85.3974  |40.5104  |84.9922  |76.9817  |80.5302  |62.5385  |57.9460  |68.1441   |63.9090   |62.2197   |52.4943   |64.3090   |0  |
|0.3100  |0.0000  |52.6300         |25.3

In [9]:
#Saving the frame in csv files
df2.write.format("csv").save("all_gaz_data")

In [10]:
from starbase import Connection

In [21]:
"""
Insert data into HBase with a Python script.
We have already create a gaz_data table in hbase shell with three column familly Mesurement,FIGARO TGS 3870 A-04, 
and FIS SB-500-12

Original dataset can be downloaded at kaggle following the bellow link
https://www.kaggle.com/datasets/javi2270784/gas-sensor-array-temperature-modulation/data 
"""

import csv
import happybase
import time

batch_size = 1000
host = "127.0.0.1"
file_path = "all_gaz_data/part-00012-3c96fcd8-695c-4a62-bdd2-1e581b6c717e-c000.csv"
row_count = 0
start_time = time.time()
table_name = "gas_data"
port = 16010


def connect_to_hbase():
    """ Connect to HBase server.
    This will use the host, namespace, table name, and batch size as defined in
    the global variables above.
    """
    conn = happybase.Connection(host = host, port=port)
    conn.open()
    table = conn.table(table_name)
    batch = table.batch(batch_size = batch_size)
    return conn, batch


def insert_row(batch, row):
    """ Insert a row into HBase.
    Write the row to the batch. When the batch size is reached, rows will be
    sent to the database.
    Rows have the following schema:
        [ id, keyword, subcategory, type, township, city, zip, council_district,
          opened, closed, status, origin, location ]
    """
    batch.put(row[20], { "Mesurement:time": row[0], "Mesurement:co": row[1], "Mesurement:Temperature": row[2],
        "Mesurement:Humidity": row[3], "Mesurement:Heater_voltage": row[4], "Mesurement:Flow_rate": row[5],
        "FIGARO TGS 3870 A-04:R1": row[6], "FIGARO TGS 3870 A-04:R2": row[7], "FIGARO TGS 3870 A-04:R3": row[8],
        "FIGARO TGS 3870 A-04:R4": row[9], "FIGARO TGS 3870 A-04:R5": row[10], "FIGARO TGS 3870 A-04:R6": row[11],
        "FIGARO TGS 3870 A-04:R7": row[12], "FIS SB-500-12:R8": row[13], "FIS SB-500-12:R9": row[14],
        "FIS SB-500-12:R10": row[15], "FIS SB-500-12:R11": row[16], "FIS SB-500-12:R12": row[17],
        "FIS SB-500-12:R13": row[18], "FIS SB-500-12:R14": row[19]})


def read_csv():
    csvfile = open(file_path, "r")
    csvreader = csv.reader(csvfile)
    return csvreader, csvfile


# After everything has been defined, run the script.
conn, batch = connect_to_hbase()
print ("Connect to HBase. table name: %s, batch size: %i" % (table_name, batch_size))
csvreader, csvfile = read_csv()
print ("Connected to file. name: %s" % (file_path))

try:
    # Loop through the rows. The first row contains column headers, so skip that
    # row. Insert all remaining rows into the database.
    for row in csvreader:
        row_count += 1
        if row_count == 1:
            pass
        else:
            insert_row(batch, row)

    # If there are any leftover rows in the batch, send them now.
    batch.send()
finally:
    # No matter what happens, close the file handle.
    csvfile.close()
    conn.close()

duration = time.time() - start_time
print ("Done. row count: %i, duration: %.3f s" % (row_count, duration))

Connect to HBase. table name: gaz_data, batch size: 1000
Connected to file. name: all_gaz_data\part-00000-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00001-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00002-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00003-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00004-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00005-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00006-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00007-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00008-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
Connected to file. name: all_gaz_data\part-00009-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c0

KeyboardInterrupt: 

In [18]:


for f in files:
    print(f)

all_gaz_data\part-00000-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00001-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00002-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00003-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00004-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00005-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00006-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00007-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00008-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00009-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00010-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00011-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
all_gaz_data\part-00012-6ee4aa9b-ba89-4776-b598-0cc25375cadc-c000.csv
