### De-Dupping using MP

In [4]:
import pandas as pd
import torch
import numpy as np
import os
import matplotlib.pyplot as plt
import torch
import time
import torch.multiprocessing as mp
%reload_ext jupyternotify

CrystalData = pd.read_csv('Crystal Database.csv')

<IPython.core.display.Javascript object>

  CrystalData = pd.read_csv('Crystal Database.csv')


In [5]:
# Function to process the DataFrame on each GPU
def process_data_on_gpu(rank, world_size, CrystalData, shared_dict):
    # Filter and prepare the DataFrame
    booleanFilter = CrystalData['name'].value_counts() > 1
    booleanFilter = booleanFilter[booleanFilter]
    CrystalDuplicatesDF = CrystalData[CrystalData['name'].isin(booleanFilter.index)]
    all_unique_names = CrystalDuplicatesDF['name'].unique()
    
    # Process chunks of unique names on each GPU
    chunk_size = len(all_unique_names) // world_size
    start_idx = rank * chunk_size
    end_idx = (rank + 1) * chunk_size if rank != world_size - 1 else len(all_unique_names)
    unique_names_chunk = all_unique_names[start_idx:end_idx]
    if rank == 0:
        print(f'Splitting {len(all_unique_names)}')
    # Initialize a local dictionary for the current process
    local_dict = {}
    #split dataframe into dictionary of dataframes with multiple entires of the same chemical name
    for name in unique_names_chunk:
        #print('Splitting' + name)
        group = CrystalDuplicatesDF[CrystalDuplicatesDF['name'] == name]
        local_dict[name] = group

    # Store the local dictionary in the shared dictionary
    shared_dict[rank] = local_dict
    print(f'Finished {rank}/{world_size} Splitting Process')

# Function to filter each dataframe from the dictionary to delete all merges,
# Algorithm to Delete Merges from Set:
    # Takes newest entry based on pubyear
    # if there are multiple of the same year or NULL years, 
    #    takes entry with cloest to medain volume in that range
    #rest are added to "toRe" dataframe
def filter_data_on_gpu(rank, world_size, shared_dict, filtered_shared_dict, toRe_shared_dict):
    local_dict = shared_dict[rank]
    filtered_local_dict = {}
    toRe_local = pd.DataFrame()

    for x in local_dict.keys():
        df = local_dict.get(x)
        #verifies that we are only grabbing those with the same space group
        booleanFilter = df['Space Group'].value_counts() > 1 
        booleanFilter = booleanFilter[booleanFilter]
        if len(booleanFilter) == 0:
            continue

        df = df[df['Space Group'].isin(booleanFilter.index)].sort_values(by='Space Group')
        unique_spaceGroups = df['Space Group'].unique()
        differnetSpaceGroups = {sGroup: df[df['Space Group'] == sGroup] for sGroup in unique_spaceGroups}
        spaceGroupsLeft = list(differnetSpaceGroups.keys())

        for p in spaceGroupsLeft:
            df = differnetSpaceGroups.get(p)
            filters = df['Space Group'].value_counts() > 1
            filters = filters[filters]
            df = df[df['Space Group'].isin(filters.index)]
            df['Publication Year'] = df['Publication Year'].astype(str)
            non_numeric_strings = df[df['Publication Year'].apply(lambda x: not x.isnumeric())]

            if non_numeric_strings.shape[0] > 0:
                non_numeric_mask = ~df['Publication Year'].str.isnumeric()
                df_non_numeric = df[non_numeric_mask]
                df = df[df['Publication Year'].str.isnumeric()]
                if df.shape[0] == 0:
                    df_non_numeric = df_non_numeric.iloc[1:]
                    toRe_local = pd.concat([toRe_local, df_non_numeric], axis=0)
                    continue
                else:
                    toRe_local = pd.concat([toRe_local, df_non_numeric], axis=0)

            highestYear = df.loc[df['Publication Year'].astype(int) == df['Publication Year'].astype(int).max()]
            if len(highestYear) == 1:
                removedElements = df.loc[df['Publication Year'].astype(int) != df['Publication Year'].astype(int).max()]
            else:
                toRe_local = pd.concat([df.loc[df['Publication Year'].astype(int) != df['Publication Year'].astype(int).max()], toRe_local], axis=0)
                data_median = highestYear['Volume'].median()
                data_diff = (highestYear['Volume'] - data_median).abs()
                min_diff = data_diff.min()
                closest_indices = data_diff[data_diff == min_diff].index
                closest_index = closest_indices[0]
                removedElements = highestYear.loc[highestYear.index != closest_index]
            
            toRe_local = pd.concat([toRe_local, removedElements], axis=0)
    filtered_shared_dict[rank] = filtered_local_dict
    toRe_shared_dict[rank] = toRe_local
    print(f'Finshed {rank}/{world_size} Filtering Process')

In [6]:
%%notify
world_size = 4
print(f'Amount of GPUs in use: {world_size}')
manager = mp.Manager()
shared_dict = manager.dict()
filtered_shared_dict = manager.dict()
toRe_shared_dict = manager.dict()

start_time = time.time()
# Create and start processes for initial processing
processes = []
print('Intalziing Splitting Process')
for rank in range(world_size):
    p = mp.Process(target=process_data_on_gpu, args=(rank, 4, CrystalData, shared_dict))
    print(f'Starting {rank}/{world_size} Splitting Process')
    p.start()
    processes.append(p)

# Join processes for initial processing
for p in processes:
    p.join()
    
end_time = time.time()  
print('Splitting Finished')
print(f"Splitting took {end_time - start_time:.2f} seconds") #2002,6 secibds

start_time = time.time()
# Create and start processes for filtering
processes = []
print('Intialziing Filtering Process')
for rank in range(world_size):
    p = mp.Process(target=filter_data_on_gpu, args=(rank, world_size, shared_dict, filtered_shared_dict, toRe_shared_dict))
    print(f'Starting {rank}/{world_size} Filtering Process')
    p.start()
    processes.append(p)

# Join processes for filtering
for p in processes:
    p.join()

# Merge results from all GPUs
CrystalDuplicatesDict = {}
for local_dict in filtered_shared_dict.values():
    CrystalDuplicatesDict.update(local_dict)

end_time = time.time()  
print('Filtering Finished')
print(f"Filtering took {end_time - start_time:.2f} seconds")  #671.84 seconds

toRemove = pd.DataFrame()
for df in toRe_shared_dict.values():
    toRemove = pd.concat([toRemove, df], axis=0)

#toRemove is a dataframe with all duplicate values that need to be removed


Amount of GPUs in use: 4
Intalziing Splitting Process
Starting 0/4 Splitting Process
Starting 1/4 Splitting Process
Starting 2/4 Splitting Process
Starting 3/4 Splitting Process
Splitting148931
Finished 0/4 Splitting Process
Finished 2/4 Splitting Process
Finished 3/4 Splitting Process
Finished 1/4 Splitting Process
Splitting Finished
Splitting took 2035.68 seconds
Intialziing Filtering Process
Starting 0/4 Filtering Process
Starting 1/4 Filtering Process
Starting 2/4 Filtering Process
Starting 3/4 Filtering Process
Finshed 2/4 Filtering Process
Finshed 1/4 Filtering Process
Finshed 3/4 Filtering Process
Finshed 0/4 Filtering Process
Filtering Finished
Filtering took 680.90 seconds


<IPython.core.display.Javascript object>

In [None]:
CrystalDatabaseWithoutDups = CrystalData[~CrystalData.index.isin(toRemove.index)]
CrystalDatabaseWithoutDups.to_csv('De_Dupped_Data.csv')

### Graphs

In [None]:
value_counts_dups = CrystalData['Bravais Lattice'].value_counts()
value_counts_ddups = CrystalDatabaseWithoutDups['Bravais Lattice'].value_counts()


# Plotting
plt.figure(figsize=(16, 12)) # Adjust the size of the figure if needed
# Plot for dups
plt.subplot(121)
value_counts_dups.plot(kind='bar', color='red',alpha=0.8, label='Duplicated Data')
value_counts_ddups.plot(kind='bar', color='green',label='De-Duplicated Data')
plt.ylabel('Count')
plt.title("ICSD and CSD Bravais Lattices for Duplicated vs De-Duplicated Data")
plt.legend()