Pipeline

In [1]:
!pip install pyspark

# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType


from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.types as tp

# Create a Spark session
spark = SparkSession.builder \
    .appName("DiabetesPredictionPipeline") \
    .getOrCreate()

# Read the CSV file
my_data = spark.read.csv('/content/diabaties.csv', header=True)

# Define the schema for the data
my_schema = tp.StructType([
    tp.StructField(name='Pregnancies', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Glucose', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='BloodPressure', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='SkinThickness', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Insulin', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='BMI', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='DiabetesPedigreeFunction', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='Age', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Outcome', dataType=tp.IntegerType(), nullable=True)
])

# Read the data again with the defined schema
my_data = spark.read.csv('/content/diabaties.csv', schema=my_schema, header=True)

# Print the schema
my_data.printSchema()

# Define stages for the pipeline
imputer = Imputer(
    inputCols=my_data.columns,
    outputCols=["{}_imputed".format(c) for c in my_data.columns]
).setStrategy("median")

assembler = VectorAssembler(
    inputCols=['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness',
               'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'],
    outputCol='features'
)

lr = LogisticRegression(featuresCol='features', labelCol='Outcome', maxIter=10)

# Create the pipeline
pipeline = Pipeline(stages=[imputer, assembler, lr])

# Split the data into training and test sets
xtrain, xtest = my_data.randomSplit([0.7, 0.3])

# Fit the pipeline on training data
pipeline_model = pipeline.fit(xtrain)

# Make predictions on the test data
predictions = pipeline_model.transform(xtest)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
     -------------------------------------- 317.3/317.3 MB 3.0 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
     ------------------------------------ 200.5/200.5 kB 715.2 kB/s eta 0:00:00
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840653 sha256=1eca8f6cc21518c0fd962cb28c7b12eb6675a26792b692c6647942ce5dc9bcd9
  Stored in directory: c:\users\aditya\appdata\local\pip\cache\wheels\2e\d2\18\6f4f20e8332359f7fffceb6828edcc80ef96f86744192a7bb9
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.3




AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/diabaties.csv.

K-Shingles

In [None]:
import pandas as pd

def k_shingles(text, k):
    words = text.split()
    shingles = set()
    for i in range(len(words) - k + 1):
        shingle = ' '.join(words[i:i + k])
        shingles.add(shingle)
    return shingles

# Define the hash functions
def hash_function_1(x):
    return (3 * x + 5) % 554

def hash_function_2(x):
    return (7 * x + 4) % 554

def hash_function_3(x):
    return (3 * x + 1) % 554

# File paths
file_paths = [
    "/content/shingle1.txt",
    "/content/shingle2.txt",
    "/content/shingle3.txt"
]

# Token size
k = 5

# Dictionary to hold shingles for each file
shingle_dict = {}

# Process each file
for file_path in file_paths:
    with open(file_path, 'r') as file:
        content = file.read()

    shingles = k_shingles(content, k)
    shingle_dict[file_path] = shingles

# Create a set of all unique shingles across all files
all_shingles = list(set.union(*[set(shingles) for shingles in shingle_dict.values()]))

# Initialize an incidence matrix
incidence_matrix = pd.DataFrame(0, index=all_shingles, columns=file_paths)

# Populate the incidence matrix
for file_path, shingles in shingle_dict.items():
    for shingle in shingles:
        incidence_matrix.at[shingle, file_path] = 1

# Function to compute signatures
def compute_signature(incidence_matrix):
    signatures = {file_path: [] for file_path in incidence_matrix.columns}

    for shingle in incidence_matrix.index:
        # Use the index of the shingle in the all_shingles list
        index = list(incidence_matrix.index).index(shingle)

        for file_path in incidence_matrix.columns:
            # Calculate hash values
            hash_value = [
                hash_function_1(index),
                hash_function_2(index),
                hash_function_3(index)
            ]
            # Append hash values if the shingle is present in the file
            if incidence_matrix.at[shingle, file_path] == 1:
                signatures[file_path].append(hash_value)

    # Find the minimum hash value for each hash function per file
    final_signatures = {}
    for file_path, values in signatures.items():
        if values:  # If there are any values
            final_signatures[file_path] = [
                min(value[0] for value in values),  # Min for Hash1
                min(value[1] for value in values),  # Min for Hash2
                min(value[2] for value in values)   # Min for Hash3
            ]
        else:
            final_signatures[file_path] = [None, None, None]  # Handle case with no shingles

    return pd.DataFrame(final_signatures, index=['Hash1', 'Hash2', 'Hash3']).T

# Compute signatures
signature_matrix = compute_signature(incidence_matrix)

# Display the incidence matrix and signature matrix
print("Incidence Matrix:")
print(incidence_matrix)

print("\nSignature Matrix:")
print(signature_matrix)



Incidence Matrix:
                                             /content/shingle1.txt  \
combat crime and so on".[8]                                      0   
of data now available are                                        0   
sometimes loosely partly due to                                  1   
for large enterprises is determining                             0   
many entries (rows) offer greater                                1   
...                                                            ...   
estimated to reach $215.7 billion                                0   
or insightfulness of the data.[5]                                1   
data presents challenges in sampling,                            1   
devices, aerial (remote sensing) equipment,                      0   
analytics methods that extract value                             0   

                                             /content/shingle2.txt  \
combat crime and so on".[8]                                      1   
o

Bloom Filter

In [None]:
class BloomFilter:
    def __init__(self, size):
        self.size = size
        self.bit_array = [0] * size

    def hash1(self, x):
        return (x + 1) % self.size

    def hash2(self, x):
        return (2 * x + 5) % self.size

    def add(self, x):
        index1 = self.hash1(x)
        index2 = self.hash2(x)
        self.bit_array[index1] = 1
        self.bit_array[index2] = 1

    def check(self, x):
        index1 = self.hash1(x)
        index2 = self.hash2(x)
        return self.bit_array[index1] == 1 and self.bit_array[index2] == 1

# Initialize Bloom filter
bloom_filter = BloomFilter(13)

# Add elements 8, 17, 25, 14, 20 to the Bloom filter
elements_to_add = [8, 17, 25, 14, 20]
for elem in elements_to_add:
    bloom_filter.add(elem)

# Check for integers 7 and 5
check_elements = [7, 5]
for elem in check_elements:
    if bloom_filter.check(elem):
        print(f"Element {elem} may be in the set.")
    else:
        print(f"Element {elem} is definitely not in the set.")


Element 7 may be in the set.
Element 5 may be in the set.


AMS(With Given Values)

In [None]:
import random

# AMS Algorithm Implementation
def ams_algorithm(stream, x_values):
    n = len(stream)

    # Initialize the sum of square estimates
    sum_squared_estimates = 0

    # Perform the AMS estimate for each x_value
    for x in x_values:
        # Choose a random element r from the stream
        r = stream[x - 1]  # x is 1-indexed, so we use x-1 for 0-indexed lists

        # Count the number of times r appears in the stream
        count_r = stream.count(r)

        # Calculate the square of count_r and update the sum of square estimates
        sum_squared_estimates += n * (2 * count_r - 1)

    # Return the average of the square estimates
    return sum_squared_estimates / len(x_values)

# Given stream and x values
stream = [2, 3, 7, 1, 5, 8, 5, 7, 9, 6, 4, 4, 5, 6, 5, 8, 8, 5,2, 2, 2, 1, 1, 6, 7]
x_values = [1, 3, 5, 10]  # Values of x1, x2, x3, and x4 as 1, 3, 5, and 10

# Calculate the surprise number using AMS algorithm
surprise_number = ams_algorithm(stream, x_values)

# Output the result
print("The surprise number (Second Frequency Moment Estimate) is:", surprise_number)


The surprise number (Second Frequency Moment Estimate) is: 162.5


AMS(with Random numbers)

In [None]:
import random

# AMS Algorithm Implementation
def ams_algorithm(stream, x_values):
    n = len(stream)

    # Initialize the sum of square estimates
    sum_squared_estimates = 0

    # Perform the AMS estimate for each x_value
    for x in x_values:
        # Choose a random element r from the stream
        r = stream[x - 1]  # x is 1-indexed, so we use x-1 for 0-indexed lists

        # Count the number of times r appears in the stream
        count_r = stream.count(r)

        # Calculate the square of count_r and update the sum of square estimates
        sum_squared_estimates += n * (2 * count_r - 1)

    # Return the average of the square estimates
    return sum_squared_estimates / len(x_values)

# Generate a random stream of integers
random_stream = [random.randint(1, 10) for _ in range(25)]

# Given x values
x_values = [1, 3, 5, 10]  # Values of x1, x2, x3, and x4 as 1, 3, 5, and 10

# Calculate the surprise number using AMS algorithm on the random stream
surprise_number = ams_algorithm(random_stream, x_values)

# Output the result
print("The random stream is:", random_stream)
print("The surprise number (Second Frequency Moment Estimate) is:", surprise_number)


The random stream is: [7, 10, 6, 8, 8, 6, 7, 6, 2, 3, 7, 9, 4, 8, 1, 7, 10, 8, 1, 9, 6, 5, 7, 9, 5]
The surprise number (Second Frequency Moment Estimate) is: 150.0


Flajolet Martin algo

In [None]:
def hash_value(x, a, b, m):
    # Hash function: (a*x + b) mod m
    return (a * x + b) % m

def tail_length(bin_str):
    # Find the length of trailing zeros in the binary representation of the hash value
    return len(bin_str) - len(bin_str.rstrip('0'))

def flajolet_martin(stream, a, b, m):
    max_tail_length = 0
    tail_lengths = []  # List to store tail lengths for each element

    for element in stream:
        # Hash the element and convert to binary
        hash_val = hash_value(element, a, b, m)
        bin_hash = format(hash_val, 'b').zfill(len(bin(hash_val)[2:]))  # Convert to binary and pad with zeros

        # Calculate the tail length of the binary hash
        t_len = tail_length(bin_hash)
        tail_lengths.append((element, hash_val, bin_hash, t_len))  # Store element, hash, and tail length

        # Keep track of the maximum tail length
        max_tail_length = max(max_tail_length, t_len)

    # Print the tail lengths
    for elem, h_val, bin_h, t_len in tail_lengths:
        print(f"Element: {elem}, Hash: {h_val}, Binary Hash: {bin_h}, Tail Length: {t_len}")

    # Estimate the number of distinct elements
    return 2 ** max_tail_length

# Example usage
stream = [3, 1, 4, 1, 5, 9, 2, 6, 5]

# Use hash function 3x + 7 mod 32
a1, b1, m1 = 3, 7, 32
estimate1 = flajolet_martin(stream, a1, b1, m1)
print(f"\nEstimated number of distinct elements with hash function 3x + 7 mod 32: {estimate1}")

# Use hash function 2x + 1 mod 32
a2, b2, m2 = 2, 1, 32
estimate2 = flajolet_martin(stream, a2, b2, m2)
print(f"\nEstimated number of distinct elements with hash function 2x + 1 mod 32: {estimate2}")


Element: 3, Hash: 16, Binary Hash: 10000, Tail Length: 4
Element: 1, Hash: 10, Binary Hash: 1010, Tail Length: 1
Element: 4, Hash: 19, Binary Hash: 10011, Tail Length: 0
Element: 1, Hash: 10, Binary Hash: 1010, Tail Length: 1
Element: 5, Hash: 22, Binary Hash: 10110, Tail Length: 1
Element: 9, Hash: 2, Binary Hash: 10, Tail Length: 1
Element: 2, Hash: 13, Binary Hash: 1101, Tail Length: 0
Element: 6, Hash: 25, Binary Hash: 11001, Tail Length: 0
Element: 5, Hash: 22, Binary Hash: 10110, Tail Length: 1

Estimated number of distinct elements with hash function 3x + 7 mod 32: 16
Element: 3, Hash: 7, Binary Hash: 111, Tail Length: 0
Element: 1, Hash: 3, Binary Hash: 11, Tail Length: 0
Element: 4, Hash: 9, Binary Hash: 1001, Tail Length: 0
Element: 1, Hash: 3, Binary Hash: 11, Tail Length: 0
Element: 5, Hash: 11, Binary Hash: 1011, Tail Length: 0
Element: 9, Hash: 19, Binary Hash: 10011, Tail Length: 0
Element: 2, Hash: 5, Binary Hash: 101, Tail Length: 0
Element: 6, Hash: 13, Binary Hash: 1

Bipartite


In [None]:
from collections import defaultdict

# Greedy algorithm for bipartite matching
class BipartiteMatcher:
    def __init__(self, U, V, edges):
        # U and V are the sets of vertices in the bipartite graph
        # edges is a list of tuples representing edges between U and V
        self.U = U
        self.V = V
        self.edges = edges
        self.matching = set()
        self.matched_U = set()
        self.matched_V = set()

    def greedy_match(self):
        for u, v in self.edges:
            # Check if both u and v are not already matched
            if u not in self.matched_U and v not in self.matched_V:
                # Add the edge to the matching
                self.matching.add((u, v))
                # Mark both vertices as matched
                self.matched_U.add(u)
                self.matched_V.add(v)

        return self.matching

# Define sets U and V
U = {1, 2, 3, 4}  # Set U
V = {'a', 'b', 'c', 'd'}  # Set V

# List of edges between U and V
edges = [(1, 'a'), (2, 'b'), (3, 'a'), (4, 'c'), (2, 'd')]

# Create BipartiteMatcher object and find maximal matching
matcher = BipartiteMatcher(U, V, edges)
matching = matcher.greedy_match()

print("Maximal Matching:", matching)


Maximal Matching: {(4, 'c'), (1, 'a'), (2, 'b')}
