In [1]:
# USING MULTIPROCESSING

In [None]:
import pandas as pd
import multiprocessing as mp
from multiprocessing import Value, Lock
from tqdm import tqdm
import time 
import csv

# Define a global shared variable and lock
max_reached = Value('i', 0)  # Shared variable to count rows found
lock = Lock()  # Lock to synchronize access to the shared variable

def filter_chunk_early_stop(chunk, query, max_num):
    """Filter rows in a chunk and stop if max_num is reached."""
    global max_reached, lock
    chunk = chunk.dropna(subset=['taxonKey'])
    filtered = chunk
    for key, value in query.items():
        filtered = filtered[filtered[key] == value]
        if filtered.empty:
            break
    
    with lock:  # Synchronize access to the shared variable
        if max_reached.value >= max_num:
            return pd.DataFrame()  # Skip further processing
        rows_to_add = min(len(filtered), max_num - max_reached.value)
        max_reached.value += rows_to_add
        return filtered.head(rows_to_add)

def process_file_with_tqdm(file_path, query, max_num, chunksize=100000, num_processes=10):
    """Filter rows from a large CSV with early stopping using tqdm for progress."""
    pool = mp.Pool(num_processes)
    results = []

    # Initialize tqdm progress bar
    with tqdm(total=max_num, desc="Rows Found", position=0, leave=True) as pbar:
        for chunk in pd.read_csv(file_path, delimiter='\t', quoting=csv.QUOTE_NONE, chunksize=chunksize, usecols=['gbifID','taxonKey']):
            result = pool.apply_async(filter_chunk_early_stop, args=(chunk, query, max_num))
            results.append(result)
            
            # Update progress bar
            with lock:
                pbar.n = max_reached.value  # Update progress bar to current count
                pbar.refresh()  # Refresh the bar to show updated progress
            
            # Check global variable for early stopping
            with lock:
                if max_reached.value >= max_num:
                    break

        pool.close()
        pool.join()

        # Collect results
        filtered_results = pd.concat([res.get() for res in results if not res.get().empty])
        return filtered_results.head(max_num)

# Example usage
file_path = '/workdir/datasets/GBIF/occurrence.txt'  # Path to your large CSV file
query = {'taxonKey': 2930137}  # Query filters
max_num = 1000  # Maximum number of rows to return

start = time.time()
result = process_file_with_tqdm(file_path, query, max_num)
print(time.time() - start, result)

Rows Found:   0%|          | 0/1000 [00:00<?, ?it/s]

Rows Found: 100%|██████████| 1000/1000 [01:56<00:00,  8.59it/s]

116.52868795394897               gbifID  taxonKey
34149     3892886428   2930137
45187     3872936011   2930137
58122     4431071025   2930137
62690     2883212626   2930137
88869     4420881430   2930137
...              ...       ...
14600031  4420512285   2930137
14614447  1228033290   2930137
14615977   575146991   2930137
14615979   574864780   2930137
14641706   575014709   2930137

[1000 rows x 2 columns]





In [19]:
import pandas as pd
from tqdm import tqdm
import multiprocessing as mp
from multiprocessing import Manager, Lock
import csv

manager = Manager()
found_list = manager.list()  # Shared list to track found values
lock = Lock()  # Lock to synchronize updates

def extract_column_values_to_set(df, column_name):
    """Extract unique values from a specified column and return as a set."""
    return set(df[column_name].unique())

def filter_chunk_limited(chunk, column_name, value_set, max_filt, total_rows):
    """Filter a chunk and limit results to max_filt rows."""
    chunk = chunk.dropna(subset=['identifier'])
    filtered = chunk[chunk[column_name].astype(int).isin(value_set)]
    with lock:
        rows_to_add = max_filt - total_rows.value
        if rows_to_add <= 0:
            return pd.DataFrame(), True  # Signal to stop further processing
        limited_filtered = filtered.head(rows_to_add)
        total_rows.value += len(limited_filtered)
    return limited_filtered, total_rows.value >= max_filt

def filter_rows_by_set_limited(file_path, column_name, value_set, chunksize=10000, num_processes=4, max_filt=5000):
    """Filter rows from a large CSV where columnX is in value_set, limited to max_filt rows."""
    pool = mp.Pool(num_processes)
    results = []
    total_rows = manager.Value('i', 0)  # Shared counter for total rows collected
    stop_processing = manager.Value('i', 0)  # Flag to stop processing

    # Initialize tqdm progress bar
    with tqdm(desc=f"Filtering Rows (Total Rows: 0/{max_filt})", position=0, leave=True) as pbar:
        for chunk in pd.read_csv(file_path, delimiter='\t', quoting=csv.QUOTE_NONE, chunksize=chunksize, usecols=['gbifID', 'identifier']):
            if stop_processing.value:  # Check if we should stop processing
                break
            
            result = pool.apply_async(filter_chunk_limited, args=(chunk, column_name, value_set, max_filt, total_rows))
            results.append(result)
            
            with lock:
                pbar.set_description(f"Filtering Rows (Total Rows: {total_rows.value}/{max_filt})")
                pbar.refresh()
                if total_rows.value >= max_filt:
                    stop_processing.value = 1
                    break

    pool.close()
    pool.join()

    # Combine results
    filtered_results = pd.concat([res.get()[0] for res in results if not res.get()[0].empty], ignore_index=True)
    return filtered_results

# Example usage

# Step 1: Extract 'columnX' values from the 'result' DataFrame into a set
columnX = 'gbifID'  # Replace with your actual column name
S = extract_column_values_to_set(result, columnX)

# Step 2: Read another CSV and filter rows where 'columnX' is in set S, limited to max_filt rows
second_file_path = '/workdir/datasets/GBIF/multimedia.txt'  # Path to the second CSV file
max_filt = 100  # Limit to 5000 rows
filtered_rows = filter_rows_by_set_limited(second_file_path, columnX, S, max_filt=max_filt)

# Display the filtered rows
print(filtered_rows)


Filtering Rows (Total Rows: 100/100): : 0it [00:36, ?it/s]


        gbifID                                         identifier
0   1228033290  https://iiif.rbge.org.uk/herb/iiif/E00143531/m...
1   1228033290  https://iiif.rbge.org.uk/herb/iiif/E00143531/f...
2   1668888820  https://inaturalist-open-data.s3.amazonaws.com...
3   1668888820  https://inaturalist-open-data.s3.amazonaws.com...
4   1668888820  https://inaturalist-open-data.s3.amazonaws.com...
..         ...                                                ...
95  3112872421  https://inaturalist-open-data.s3.amazonaws.com...
96  3113314776  https://inaturalist-open-data.s3.amazonaws.com...
97  3301821556  https://inaturalist-open-data.s3.amazonaws.com...
98  3301821556  https://inaturalist-open-data.s3.amazonaws.com...
99  3302054575  https://inaturalist-open-data.s3.amazonaws.com...

[100 rows x 2 columns]


In [44]:
# async download images from filtered_rows identifier column save images in save_path 
import os 
import requests

def download_image(url, save_path):
    """Download an image from a URL and save it to the specified path."""
    response = requests.get(url)
    with open(save_path, 'wb') as file:
        file.write(response.content)
        
def download_images_async(filtered_rows, save_dir):
    """Download images from URLs in the 'identifier' column of filtered_rows."""
    urls = filtered_rows['identifier'].tolist()
    names = filtered_rows['gbifID'].tolist()
    save_paths = [os.path.join(save_dir, f"{names[i]}.jpg") for i in range(len(urls))]
    with mp.Pool(10) as pool:
        pool.starmap(download_image, zip(urls, save_paths))
        
# Example usage
save_dir = '/workdir/download/'+'/'.join(sum(list([k,str(v)] for k,v in query.items()),[]))  # Directory to save images
os.makedirs(save_dir, exist_ok=True)

download_images_async(filtered_rows, save_dir)
print(f"Downloaded {len(filtered_rows)} images to {save_dir}")


Downloaded 100 images to /workdir/download/taxonKey/2930137


In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m32.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=8c26d1ad0721feaab24fbf03f8a0002c6eb2f471d8451f8f7b7288394e6e189b
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.3
[0m

In [2]:
# Spark SQL
! pip install pyspark[sql]
# pandas API on Spark
! pip install pyspark[pandas_on_spark] plotly  # to plot your data, you can install plotly together.
# Spark Connect
! pip install pyspark[connect]

[0mCollecting plotly
  Downloading plotly-5.24.1-py3-none-any.whl.metadata (7.3 kB)
Downloading plotly-5.24.1-py3-none-any.whl (19.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.1/19.1 MB[0m [31m53.2 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hInstalling collected packages: plotly
Successfully installed plotly-5.24.1
Collecting grpcio>=1.56.0 (from pyspark[connect])
  Downloading grpcio-1.68.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.9 kB)
Collecting grpcio-status>=1.56.0 (from pyspark[connect])
  Downloading grpcio_status-1.68.0-py3-none-any.whl.metadata (1.1 kB)
Collecting googleapis-common-protos>=1.56.4 (from pyspark[connect])
  Downloading googleapis_common_protos-1.66.0-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading googleapis_common_protos-1.66.0-py2.py3-none-any.whl (221 kB)
Downloading grpcio-1.68.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (5.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

# USING PYSPARK

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/29 16:02:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
path = '/workdir/datasets/GBIF/occurrence.txt'
df = spark.read.options(delimiter="\t", header=True).csv(path)
path = '/workdir/datasets/GBIF/multimedia.txt'
dfm = spark.read.options(delimiter="\t", header=True).csv(path)

In [3]:
df.createOrReplaceTempView("occurrence")
dfm.createOrReplaceTempView("multimedia")

# select 1000 gbifID from occurrence where taxonKey = 2930137 and take 100 rows from multimedia having gbifID in the result
sqlDF = spark.sql("SELECT gbifID FROM occurrence WHERE taxonKey = 2930137 LIMIT 1000")
sqlDF.createOrReplaceTempView("occurrence_filtered")
sqlDF = spark.sql("SELECT gbifID,identifier FROM multimedia WHERE gbifID in (SELECT gbifID FROM occurrence_filtered) LIMIT 200")

24/11/29 16:02:41 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [4]:
result = sqlDF.collect()
dfr = spark.createDataFrame(result)

                                                                                

In [None]:
# Example usage
query = {'taxonKey': 2930137}  # Query filters
save_dir = '/workdir/download/'+'/'.join(sum(list([k,str(v)] for k,v in query.items()),[]))  # Directory to save images
os.makedirs(save_dir, exist_ok=True)

download_images_async(filtered_rows, save_dir)
print(f"Downloaded {len(filtered_rows)} images to {save_dir}")