In [5]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Write ORC with Bloom Filters") \
    .getOrCreate()

# Load CSV file
csv_path = "/home/jovyan/work/stores.csv"
df = spark.read.option("header", True).option("inferSchema", True).csv(csv_path)

# Show schema and preview data (optional)
df.printSchema()
df.show()

# ✅ Define output path inside warehouse
output_path = "/home/jovyan/warehouse/db2/stores_orc"

# Write ORC with Bloom filters
df.write \
    .mode("overwrite") \
    .option("orc.bloom.filter.columns", "store_id,store_name") \
    .option("orc.bloom.filter.fpp", "0.01") \
    .orc(output_path)

print("✅ ORC file with Bloom filters written at:", output_path)


root
 |-- store_id: integer (nullable = true)
 |-- store_name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+--------+----------+----------+----------+
|store_id|store_name|  latitude| longitude|
+--------+----------+----------+----------+
|       1|   Store_1|  0.511931| 58.357941|
|       2|   Store_2|-36.995204|  96.52245|
|       3|   Store_3|  1.710979| 136.72836|
|       4|   Store_4| 54.362306| 29.325625|
|       5|   Store_5| 27.621896|104.960096|
|       6|   Store_6|-45.140762| 49.248075|
|       7|   Store_7|-25.188553|121.121056|
|       8|   Store_8| -7.761529| 39.723312|
|       9|   Store_9|-12.534165| 67.932349|
|      10|  Store_10| 29.997698|  49.64388|
|      11|  Store_11| 50.647739| 41.553978|
|      12|  Store_12|  48.74235| 40.406987|
|      13|  Store_13|-15.040655| 37.568223|
|      14|  Store_14|-12.551707|121.601048|
|      15|  Store_15| 67.785301| 22.227968|
|      16|  Store_16| -7.946656| 52.70

In [None]:
df = spark.sql("SELECT * FROM stores WHERE store_id = 1001").show()


+--------+----------+----------+----------+
|store_id|store_name|  latitude| longitude|
+--------+----------+----------+----------+
|    1001|Store_1001|-45.310839|135.491436|
+--------+----------+----------+----------+



In [33]:
df = spark.sql("SELECT * FROM stores WHERE store_id = 10010000").show()

+--------+----------+--------+---------+
|store_id|store_name|latitude|longitude|
+--------+----------+--------+---------+
+--------+----------+--------+---------+



In [9]:
df = spark.read.orc("/home/jovyan/warehouse/db2/stores_orc")
df.createOrReplaceTempView("stores")
spark.sql("SELECT * FROM stores WHERE store_id = 9999").show()

+--------+----------+---------+----------+
|store_id|store_name| latitude| longitude|
+--------+----------+---------+----------+
|    9999|Store_9999|28.666345|100.853035|
+--------+----------+---------+----------+



In [10]:
spark.sql("SELECT * FROM stores WHERE store_id BETWEEN 1000 AND 2000").show()

+--------+----------+----------+----------+
|store_id|store_name|  latitude| longitude|
+--------+----------+----------+----------+
|    1000|Store_1000| 48.929564| 84.683002|
|    1001|Store_1001|-45.310839|135.491436|
|    1002|Store_1002| -5.560226| 39.690047|
|    1003|Store_1003| 30.876063|102.868682|
|    1004|Store_1004| 68.899919|124.811859|
|    1005|Store_1005| 14.270271|135.468256|
|    1006|Store_1006|-32.650969| 65.364078|
|    1007|Store_1007|-20.835574| 49.316708|
|    1008|Store_1008|-43.921252|130.086826|
|    1009|Store_1009|   1.78585| 50.713614|
|    1010|Store_1010|-29.727588| 20.171948|
|    1011|Store_1011|-19.056635|125.315733|
|    1012|Store_1012| 42.442496| 97.082303|
|    1013|Store_1013|-39.333889|131.510676|
|    1014|Store_1014| 71.499269| 67.577959|
|    1015|Store_1015| -27.41883| 26.599881|
|    1016|Store_1016| -8.594115|124.142973|
|    1017|Store_1017|-37.964524| 67.419656|
|    1018|Store_1018|-45.779676| 32.392872|
|    1019|Store_1019|  9.177237|

In [21]:
df2 = spark.sql("SELECT * FROM stores where latitude > 55 and longitude < 20")
df2.show()
df2.count()


+--------+----------+---------+---------+
|store_id|store_name| latitude|longitude|
+--------+----------+---------+---------+
|     726| Store_726|60.979219|18.152492|
|     968| Store_968|68.735934|19.833183|
|    1184|Store_1184|57.927689|18.069964|
|    1337|Store_1337|68.193193|19.259823|
|    1599|Store_1599|57.869179|18.361071|
|    1716|Store_1716|72.172562|18.458788|
|    1829|Store_1829|64.164541|18.607604|
|    1844|Store_1844|61.409051|19.052851|
|    1954|Store_1954|60.519879|19.931171|
|    1997|Store_1997|56.350953|17.690688|
|    2092|Store_2092|61.092167|19.154016|
|    2308|Store_2308|59.617544|18.425325|
|    2611|Store_2611|71.579405|18.198438|
|    2836|Store_2836|57.019163|19.894856|
|    3179|Store_3179|57.500284|18.182368|
|    3328|Store_3328|68.897401|18.753454|
|    3428|Store_3428|70.136871|17.696811|
|    3605|Store_3605|68.041155|18.715164|
|    3797|Store_3797|56.260253|19.868176|
|    4155|Store_4155|73.309743|18.363168|
+--------+----------+---------+---

4282

In [72]:
import pyorc

orc_file = "/home/jovyan/warehouse/db2/stores_orc/part-00005-9aa8ec4a-3763-4967-be92-2507c58a6b83-c000.snappy.orc"


with open(orc_file, "rb") as f:
    reader = pyorc.Reader(f)

    # Schema
    print("Schema:", reader.schema)

    stride = reader.row_index_stride
    all_rows = list(reader)
    total_rows = len(all_rows)
    print("Number of rows:", total_rows)

    # Stripe count
    print("✅ Compression:", reader.compression.name) 
    print("✅ Compression Block Size:", reader.compression_block_size)
    print("✅ Format Version:", reader.format_version)
    print("✅ Number of Stripes:", reader.num_of_stripes)
    print(reader.read_stripe)
    print(reader.bytes_lengths)
    print(reader.user_metadata)


Schema: struct<store_id:int,store_name:string,latitude:double,longitude:double>
Number of rows: 163554
✅ Compression: SNAPPY
✅ Compression Block Size: 262144
✅ Format Version: (0, 12)
✅ Number of Stripes: 1
<bound method Reader.read_stripe of <pyorc.reader.Reader object at 0xffff48b50890>>
{'content_length': 3743210, 'file_footer_length': 298, 'file_postscript_length': 25, 'file_length': 3743695, 'stripe_statistics_length': 161}
{'org.apache.spark.version': b'3.5.0'}


In [39]:

from pyspark.sql import SparkSession
from pybloom_live import BloomFilter

# Create Spark session
spark = SparkSession.builder.appName("BloomFilterCheck").getOrCreate()

# Read ORC file
df = spark.read.orc("/home/jovyan/warehouse/db2/stores_orc/part-00003-9aa8ec4a-3763-4967-be92-2507c58a6b83-c000.snappy.orc")

# Collect store_id values locally (assumes it's not huge)
store_ids = df.select("store_id").rdd.map(lambda row: row["store_id"]).collect()

# Create a local Bloom filter
bf = BloomFilter(capacity=1000000, error_rate=0.01)

# Populate Bloom filter
for store_id in store_ids:
    bf.add(store_id)

# Check if a store_id might be present
check_id = 496164
print(f"Might contain {check_id}? ->", check_id in bf)


Might contain 496164? -> True


In [38]:
df = spark.read.orc("/home/jovyan/warehouse/db2/stores_orc/part-00003-9aa8ec4a-3763-4967-be92-2507c58a6b83-c000.snappy.orc").show()

+--------+------------+----------+----------+
|store_id|  store_name|  latitude| longitude|
+--------+------------+----------+----------+
|  496164|Store_496164|  54.16662| 96.719518|
|  496165|Store_496165| 20.043097| 95.314145|
|  496166|Store_496166| 59.636568|    96.726|
|  496167|Store_496167|-38.094901|130.127986|
|  496168|Store_496168|-45.216044| 47.550081|
|  496169|Store_496169| 51.802525|136.360378|
|  496170|Store_496170| 55.026124| 61.290989|
|  496171|Store_496171|-46.327335| 54.417178|
|  496172|Store_496172| 41.134244| 73.314949|
|  496173|Store_496173| 46.194376| 71.531106|
|  496174|Store_496174| -1.726265|111.354441|
|  496175|Store_496175|-31.114968|  24.28303|
|  496176|Store_496176| -4.361737| 94.675499|
|  496177|Store_496177| 65.778544| 42.340798|
|  496178|Store_496178| 44.074423| 70.056797|
|  496179|Store_496179|-38.637158| 51.914682|
|  496180|Store_496180| 23.301149| 47.103871|
|  496181|Store_496181| 26.623572| 43.692912|
|  496182|Store_496182| 48.348829|

In [43]:
from pyspark.sql import SparkSession
import pandas as pd
from pybloom_live import BloomFilter

# Initialize Spark Session
spark = SparkSession.builder.appName("StripeStatsBloomDF").getOrCreate()

# Read ORC file
df = spark.read.orc("/home/jovyan/warehouse/db2/stores_orc/part-00003-9aa8ec4a-3763-4967-be92-2507c58a6b83-c000.snappy.orc")

# Optional: Repartition to simulate stripe-like chunks
df = df.repartition(4)

# Define function to apply on each partition using Pandas
def bloom_partition_stats(pdf_iter):
    for pdf in pdf_iter:
        if 'store_id' not in pdf.columns or pdf.empty:
            continue
        
        store_ids = pdf['store_id'].dropna().tolist()
        if not store_ids:
            continue

        bf = BloomFilter(capacity=10000000, error_rate=0.01)
        for sid in store_ids:
            bf.add(sid)

        # We can't return BloomFilter directly, so store needed info
        yield pd.DataFrame([{
            "count": len(store_ids),
            "min": min(store_ids),
            "max": max(store_ids),
            "bloom_test_496164": 496164 in bf
        }])

# Use mapInPandas to process partitions
result_df = df.select("store_id").mapInPandas(bloom_partition_stats, schema="count int, min long, max long, bloom_test_496164 boolean")

# Show results
result_df.show(truncate=False)



+-----+------+------+-----------------+
|count|min   |max   |bloom_test_496164|
+-----+------+------+-----------------+
|10000|496165|659719|false            |
|10000|496173|659709|false            |
|10000|496166|659714|false            |
|10000|496183|659718|false            |
|889  |496167|659617|false            |
|10000|496172|659707|false            |
|10000|496179|659711|false            |
|10000|496169|659684|false            |
|10000|496174|659702|false            |
|889  |496580|659616|false            |
|10000|496168|659708|false            |
|10000|496177|659697|false            |
|10000|496205|659713|false            |
|10000|496171|659716|false            |
|889  |496190|659636|false            |
|10000|496185|659698|false            |
|10000|496175|659715|false            |
|10000|496164|659689|true             |
|10000|496181|659712|false            |
|889  |496263|659594|false            |
+-----+------+------+-----------------+



In [74]:
import pyorc

# Assuming you have an ORC file named 'my_data.orc'
with open('/home/jovyan/warehouse/db2/stores_orc/part-00005-9aa8ec4a-3763-4967-be92-2507c58a6b83-c000.snappy.orc', 'rb') as orc_file:
    reader = pyorc.Reader(orc_file)

    # Iterate through each stripe
    for stripe_index in range(reader.num_of_stripes):
        stripe = reader.read_stripe(stripe_index)

        # Check for columns with Bloom filters in the current stripe
        if stripe.bloom_filter_columns:
            print(f"Stripe {stripe_index} has Bloom filters on columns: {stripe.row_offset}")

           
        else:
            print(f"Stripe {stripe_index} does not have Bloom filters on any columns.")

Stripe 0 has Bloom filters on columns: 0


In [80]:
import pyorc
from collections import defaultdict

orc_path = "/home/jovyan/warehouse/db2/stores_orc/part-00005-9aa8ec4a-3763-4967-be92-2507c58a6b83-c000.snappy.orc"

# Holds stats per column
stats = defaultdict(lambda: {"min": None, "max": None, "nulls": 0, "count": 0})

with open(orc_path, "rb") as data:
    reader = pyorc.Reader(data)
    schema = reader.schema
    print("Schema:", schema)
    print("Compression:", reader.compression.name)
    print("Compression Block Size:", reader.compression_block_size)
    print("Stripe count:", reader.num_of_stripes)
    #print("Total rows:", reader.number_of_rows)

    if hasattr(schema, "fields"):
        field_names = list(schema.fields.keys())
    else:
        field_names = [f"col{i}" for i in range(len(reader.schema))]  # fallback

    # ✅ Iterate over each row and calculate stats
    for row in reader:
        for i, value in enumerate(row):
            field = field_names[i]
            if value is None:
                stats[field]["nulls"] += 1
            else:
                stats[field]["count"] += 1
                if stats[field]["min"] is None or value < stats[field]["min"]:
                    stats[field]["min"] = value
                if stats[field]["max"] is None or value > stats[field]["max"]:
                    stats[field]["max"] = value

# ✅ Print results
print("\nColumn Stats:")
for col, stat in stats.items():
    print(f"{col}: {stat}")


Schema: struct<store_id:int,store_name:string,latitude:double,longitude:double>
Compression: SNAPPY
Compression Block Size: 262144
Stripe count: 1

Column Stats:
store_id: {'min': 823269, 'max': 986822, 'nulls': 0, 'count': 163554}
store_name: {'min': 'Store_823269', 'max': 'Store_986822', 'nulls': 0, 'count': 163554}
latitude: {'min': -47.526959, 'max': 73.470142, 'nulls': 0, 'count': 163554}
longitude: {'min': 17.095456, 'max': 138.092932, 'nulls': 0, 'count': 163554}


In [87]:
import pyorc
from collections import defaultdict

orc_path = "/home/jovyan/warehouse/db2/stores_orc/part-00005-9aa8ec4a-3763-4967-be92-2507c58a6b83-c000.snappy.orc"

with open(orc_path, "rb") as data:
    reader = pyorc.Reader(data)
    schema = reader.schema
    stripe_count = reader.num_of_stripes

    print("Schema:", schema)
    print("Compression:", reader.compression.name)
    print("Compression Block Size:", reader.compression_block_size)
    print("Stripe count:", stripe_count)
    print("Bytes lengths:", reader.bytes_lengths)
    print(reader.user_metadata)
    print(reader.row_index_stride)
    print(reader.__len__())
    print(stripe.__len__())
    print(stripe.bytes_offset)
    print(stripe.row_offset)

    field_names = list(schema.fields.keys()) if hasattr(schema, "fields") else [f"col{i}" for i in range(len(reader.schema))]

    # Simulate per-stripe stats
    for stripe_index in range(stripe_count):
        stripe_stats = defaultdict(lambda: {"min": None, "max": None, "nulls": 0, "count": 0})
        rows = reader.read_stripe(stripe_index)

        print(f"\n📦 Stripe {stripe_index} (Rows: {len(rows)})")
        for row in rows:
            for i, value in enumerate(row):
                field = field_names[i]
                if value is None:
                    stripe_stats[field]["nulls"] += 1
                else:
                    stripe_stats[field]["count"] += 1
                    if stripe_stats[field]["min"] is None or value < stripe_stats[field]["min"]:
                        stripe_stats[field]["min"] = value
                    if stripe_stats[field]["max"] is None or value > stripe_stats[field]["max"]:
                        stripe_stats[field]["max"] = value

        # Print stripe stats
        for field in field_names:
            stat = stripe_stats[field]
            print(f"  ▸ {field}: min={stat['min']} max={stat['max']} nulls={stat['nulls']} count={stat['count']}")


Schema: struct<store_id:int,store_name:string,latitude:double,longitude:double>
Compression: SNAPPY
Compression Block Size: 262144
Stripe count: 1
Bytes lengths: {'content_length': 3743210, 'file_footer_length': 298, 'file_postscript_length': 25, 'file_length': 3743695, 'stripe_statistics_length': 161}
{'org.apache.spark.version': b'3.5.0'}
10000
163554
163554
3
0

📦 Stripe 0 (Rows: 163554)
  ▸ store_id: min=823269 max=986822 nulls=0 count=163554
  ▸ store_name: min=Store_823269 max=Store_986822 nulls=0 count=163554
  ▸ latitude: min=-47.526959 max=73.470142 nulls=0 count=163554
  ▸ longitude: min=17.095456 max=138.092932 nulls=0 count=163554


In [93]:
import pyorc
from collections import defaultdict

orc_path = "/home/jovyan/warehouse/db2/stores_orc/part-00005-9aa8ec4a-3763-4967-be92-2507c58a6b83-c000.snappy.orc"

with open(orc_path, "rb") as data:
    reader = pyorc.Reader(data)
    schema = reader.schema
    stripe_count = reader.num_of_stripes

    print("Schema:", schema)
    print("Compression:", reader.compression.name)
    print("Compression Block Size:", reader.compression_block_size)
    print("Stripe count:", stripe_count)
    print("Total Rows", reader.__len__())

    
    try:
        # This will raise an error because `stripe` is not defined
        
        print("stripe.bytes_offset:", stripe.bytes_offset)
        print("stripe.row_offset:", stripe.row_offset)
    except NameError:
        print("⚠️  'stripe' is not defined — these attributes are not accessible via pyorc.")

    # Get column names
    field_names = list(schema.fields.keys()) if hasattr(schema, "fields") else [f"col{i}" for i in range(len(reader.schema))]

    # Simulate per-stripe stats
    for stripe_index in range(stripe_count):
        stripe_stats = defaultdict(lambda: {"min": None, "max": None, "nulls": 0, "count": 0})
        rows = reader.read_stripe(stripe_index)

        print(f"\n📦 Stripe {stripe_index} (Rows: {len(rows)})")
        for row in rows:
            for i, value in enumerate(row):
                field = field_names[i]
                if value is None:
                    stripe_stats[field]["nulls"] += 1
                else:
                    stripe_stats[field]["count"] += 1
                    if stripe_stats[field]["min"] is None or value < stripe_stats[field]["min"]:
                        stripe_stats[field]["min"] = value
                    if stripe_stats[field]["max"] is None or value > stripe_stats[field]["max"]:
                        stripe_stats[field]["max"] = value

        # Print stripe stats
        for field in field_names:
            stat = stripe_stats[field]
            print(f"  ▸ {field}: min={stat['min']} max={stat['max']} nulls={stat['nulls']} count={stat['count']}")


Schema: struct<store_id:int,store_name:string,latitude:double,longitude:double>
Compression: SNAPPY
Compression Block Size: 262144
Stripe count: 1
Total Rows 163554
stripe.bytes_offset: 3
stripe.row_offset: 0

📦 Stripe 0 (Rows: 163554)
  ▸ store_id: min=823269 max=986822 nulls=0 count=163554
  ▸ store_name: min=Store_823269 max=Store_986822 nulls=0 count=163554
  ▸ latitude: min=-47.526959 max=73.470142 nulls=0 count=163554
  ▸ longitude: min=17.095456 max=138.092932 nulls=0 count=163554
