In [3]:
from joblib import Parallel, delayed
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
import requests
import pandas as pd

# pip install mmh3
import mmh3

#pip install redis
import redis

r = redis.Redis(host='localhost', port=6379, db=0)

   vertex_count  h11  h12   h13
0             6    1    0   426
1            23   14    0  1385
2            12   20    0  5828
3            20   46    0  6314
4            13   42    0  4832


In [None]:
def hash_quadruple(quadruple):
    # Convert the tuple to a byte string and hash it
    quadruple_str = f'{quadruple[0]}{quadruple[1]}{quadruple[2]}{quadruple[3]}'.encode('utf-8')
    return mmh3.hash(quadruple_str)

def check_and_store_quadruple(quadruple):
    quad_hash = hash_quadruple(quadruple)
    
    # Check if the hash exists in Redis
    if r.sismember('quadruple_hashes', quad_hash):
        print(f"Quadruple {quadruple} already exists, skipping.")
        return False
    else:
        # Store the hash in a Redis set for future checks
        r.sadd('quadruple_hashes', quad_hash)
        print(f"Processed and stored quadruple {quadruple}.")
        return True

def process_quadruple(quadruple):
    if check_and_store_quadruple(quadruple):
        # Placeholder for the actual processing of the quadruple
        # This could be any logic you want to apply to the quadruple
        print(f"Processing quadruple: {quadruple}")
    else:
        print(f"Skipping quadruple: {quadruple}")


def parallel_process_quadruples(quadruples, max_workers=5):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit the quadruple for processing in parallel
        executor.map(process_quadruple, quadruples)

def read_and_process_parquet_in_chunks(parquet_file, chunk_size=10000, max_workers=5):
    # Read the Parquet file in chunks
    for chunk in pd.read_parquet(parquet_file, chunksize=chunk_size):
        # Assuming the Parquet file has the required columns 'a', 'b', 'c', 'd'
        quadruples = chunk[['a', 'b', 'c', 'd']].itertuples(index=False, name=None)  # Convert to tuple format
        parallel_process_quadruples(quadruples, max_workers)

# Example usage:
parquet_file = 'large_file.parquet'  # Replace with your actual Parquet file path

# Read and process the Parquet file in chunks with 5 worker threads
read_and_process_parquet_in_chunks(parquet_file, chunk_size=10000, max_workers=5)


#####################
#  OLD CODE
#####################


quadruples = [(1, 2, 3, 4), (5, 6, 7, 8), (1, 2, 3, 4), (9, 10, 11, 12)]  # Test quadruples
parallel_process_quadruples(quadruples, max_workers=5)


# Example processing loop
quadruples = [(1, 2, 3, 4), (5, 6, 7, 8), (1, 2, 3, 4)]  # Some test quadruples

for quadruple in quadruples:
    if check_and_store_quadruple(quadruple):
        # Continue processing the quadruple only if it is not already in Redis
        print(f"Processing quadruple: {quadruple}")
    else:
        print(f"Skipping quadruple: {quadruple}")

def process_chunk(chunk):
    # Preprocess each chunk
    return chunk

def preprocess(chunk):
    columns_to_keep = ['column1', 'column2', 'column3']  # Adjust based on your needs
    chunk = chunk[columns_to_keep]
    #chunk = chunk.fillna(0)

    #do some filtering if needed
    chunk = chunk[chunk['column1'] >= 0]

    return chunk

#######################
# shrink down
#   large_file.parquet -> processed_output.parquet
#
#   use a REDIS Cache as common knowledge base
#   containing filtered/clean data
#######################

# Define the preprocessing function
def preprocess(chunk):
    # 1. Drop Unnecessary Columns
    columns_to_keep = ['column1', 'column2', 'column3']  # Adjust based on your needs
    chunk = chunk[columns_to_keep]
    
    # 2. Handle Missing Values
    chunk = chunk.fillna(0)

    # 3. Filter Data (e.g., Remove rows where 'column1' is less than 0)
    chunk = chunk[chunk['column1'] >= 0]

    # 4. Feature Engineering (e.g., create a new column based on existing data)
    quadruple = (a, b, c, d)
    chunk['hash'] = hash(quadruple)

    return chunk


# Function to process a single chunk and write it to a file
def process_and_save_chunk(chunk, output_file):
    processed_chunk = preprocess(chunk)
    processed_chunk.to_parquet(output_file, mode='append')  # Save to file in append mode
    return len(processed_chunk)  # Return the number of processed rows for logging purposes


def parallel_process_parquet(input_file, output_file, chunk_size=10000, max_workers=5):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        
        # Read in chunks and submit each chunk for parallel processing
        for chunk in pd.read_parquet(input_file, chunksize=chunk_size):
            futures.append(executor.submit(process_and_save_chunk, chunk, output_file))
        
        # Optional: Wait for all threads to complete and get results (e.g., number of processed rows)
        results = [future.result() for future in futures]
        print(f"Processed {sum(results)} rows in total.")

input_file = 'large_file.parquet'
output_file = 'processed_output.parquet'
parallel_process_parquet(input_file, output_file, chunk_size=10000, max_workers=80)

In [None]:
# Load the Parquet file
file_path = '/run/media/eva/01DA95886B3817A0/Mathfiles/reflexive/ws-5d-reflexive-0000.parquet'
df = pd.read_parquet(file_path, chunksize=100000)

chunk_size = 10000  # Adjust this based on available memory
for chunk in pd.read_parquet('large_file.parquet', chunksize=chunk_size):
    # Perform operations on each chunk
    preprocess(chunk)

results = Parallel(n_jobs=4)(delayed(process_chunk)(chunk) for chunk in df)

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    

# Select the specific columns you're interested in
selected_columns = ['vertex_count','h11', 'h12', 'h13']
df_selected = df[selected_columns]

# Process your selected columns
print(df_selected.head())  # Display the first few rows of the selected columns

In [2]:
import pyarrow.parquet as pq
import pyarrow as pa
import glob
from collections import Counter
from collections import Counter
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Pool, Manager

# Define the path to your Parquet files
parquet_files = glob.glob('/run/media/eva/01DA95886B3817A0/Mathfiles/reflexive/*.parquet')
# Function to process a single batch
def process_batch(batch):
    local_counter = Counter()
    table = batch.to_pandas()
    col1 = table['h11']
    col2 = table['h12']
    col3 = table['h13']
    
    # Count each triple in the current batch
    for c1, c2, c3 in zip(col1, col2, col3):
        local_counter[(c1, c2, c3)] += 1
    
    return local_counter

# Function to process a single file
def process_file(file):
    # Initialize a local counter for the file
    local_counter = Counter()
    
    # Open the Parquet file
    parquet_file = pq.ParquetFile(file)
    
    # Process the file in batches
    for batch in parquet_file.iter_batches(columns=['h11', 'h12', 'h13']):
        batch_counter = process_batch(batch)
        local_counter.update(batch_counter)
    
    return local_counter

# Function to merge counters
def merge_counters(counters):
    merged_counter = Counter()
    for counter in counters:
        merged_counter.update(counter)
    return merged_counter

# Use Pool to parallelize the processing
if __name__ == "__main__":
    with Pool() as pool:
        # Submit tasks for each file
        results = pool.map(process_file, parquet_files)
    
    # Merge all the counters
    final_counter = merge_counters(results)



In [3]:
final_counter

Counter({(25827, 0, 13): 493818,
         (28348, 0, 12): 487360,
         (23426, 0, 14): 290797,
         (22386, 0, 15): 243230,
         (30989, 0, 11): 215392,
         (24738, 0, 14): 179328,
         (23946, 0, 14): 173316,
         (25746, 0, 14): 157995,
         (27548, 0, 12): 145174,
         (20154, 0, 16): 142496,
         (22385, 0, 15): 138997,
         (24065, 0, 15): 130233,
         (21065, 0, 16): 122199,
         (25747, 0, 13): 110776,
         (20824, 0, 16): 107167,
         (21145, 0, 15): 106185,
         (23345, 0, 15): 93902,
         (23226, 0, 14): 87611,
         (19618, 0, 18): 86212,
         (21545, 0, 15): 75951,
         (27203, 0, 17): 75843,
         (21344, 0, 16): 74024,
         (27427, 0, 13): 73788,
         (23145, 0, 15): 73297,
         (18905, 0, 17): 73214,
         (10045, 0, 7): 72873,
         (14189, 0, 12): 72323,
         (16959, 0, 10): 71966,
         (21823, 0, 17): 70921,
         (23320, 0, 16): 69463,
         (3277, 0, 3): 68