In [52]:
import happybase

In [53]:
import findspark

findspark.init()

In [54]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

parquet_file_path = "hdfs:///outpucik/merged_data.parquet"

merged_data = spark.read.parquet(parquet_file_path)

In [55]:
results_details_family = [
     'resultId',
     'constructorId',
     'results_number',
     'grid',
     'position',
     'positionText',
     'positionOrder',
     'points',
     'laps',
     'results_time',
     'milliseconds',
     'fastestLap',
     'rank',
     'fastestLapTime',
     'fastestLapSpeed',
     'statusId'
]

races_family = [
     'raceId',
     'year',
     'round',
     'circuitId',
     'name',
     'date',
     'race_time',
     'url'
]

driver_family = [
    'driverId',
    'driverRef',
    'driver_number',
    'code',
    'forename',
    'surname',
    'dob',
    'nationality',
    'driver_url'
]

results_dict = {
    col: "results" for col in results_details_family
}
races_dict = {
    col: "races" for col in races_family
}
driver_dict = {
    col: "driver" for col in driver_family
}

final_families = {**results_dict, **races_dict, **driver_dict}

In [56]:
import numpy as np

In [57]:
cache_data = merged_data.collect()

indices = np.random.choice(len(cache_data), 4)
indices

array([ 3373, 19921, 16375,  8307])

In [58]:
four_rows_data = [
    cache_data[index] for index in indices
]

In [59]:
import pandas as pd
from io import BytesIO

sample_data = []

for i, row in enumerate(four_rows_data):
    row_data = row.asDict()
    data = { "row_key": str(i + 1) }
    for column_name, family in final_families.items():
        data[family + ":" + column_name] = row_data[column_name]
    sample_data.append(data)

In [60]:
# sample_data

In [62]:
hbase_table_name = 'f1_results'

connection = happybase.Connection(host='localhost')
connection.open()

In [63]:
def create_or_replace_table(connection, table_name):
    if table_name.encode() in connection.tables():
        connection.delete_table(table_name, disable=True)
    connection.create_table(
        table_name,
        {
        family: dict() for family in set(final_families.values())
        }
    )

# Utwórz lub zastąp tabelę
create_or_replace_table(connection, hbase_table_name)

In [64]:
connection.tables()

[b'books',
 b'employees',
 b'f1_results',
 b'mytable',
 b'person',
 b'prog_langs',
 b'test',
 b'test2',
 b'trains',
 b'trees',
 b'tweets',
 b'wifi']

In [69]:
table = connection.table(hbase_table_name)
print(table.families().keys())

dict_keys([b'driver', b'races', b'results'])


In [70]:
for data in sample_data:
    row_key = data.pop("row_key")
    row_key_encoded = row_key.encode()
    data = {str(k).encode(): str(v).encode() for k, v in data.items()}
    table.put(row_key_encoded, data)

In [71]:
def read_table_content(connection, table_name):
    table = connection.table(table_name)
    scanner = table.scan()
    
    data = []
    for key, value in scanner:
        row_data = {'row_key': key.decode()}
        for column, val in value.items():
            row_data[column.decode()] = val.decode()
        data.append(row_data)
    
    return pd.DataFrame(data)

print("Zawartość tabeli przed aktualizacją:")
print(read_table_content(connection, hbase_table_name))

Zawartość tabeli przed aktualizacją:
  row_key driver:code  driver:dob driver:driverId driver:driverRef  \
0       1         FIS  14/01/1973              21       fisichella   
1       2        None  30/10/1906             642           farina   
2       3        None  12/11/1945             352            eaton   
3       4        None  04/07/1948             163           arnoux   

  driver:driver_number                                  driver:driver_url  \
0                 None  http://en.wikipedia.org/wiki/Giancarlo_Fisichella   
1                 None           http://en.wikipedia.org/wiki/Nino_Farina   
2                 None          http://en.wikipedia.org/wiki/George_Eaton   
3                 None      http://en.wikipedia.org/wiki/Ren%C3%A9_Arnoux   

  driver:forename driver:nationality driver:surname  ... results:milliseconds  \
0       Giancarlo            Italian     Fisichella  ...                 None   
1            Nino            Italian         Farina  ...        