### Bloom Filter

In [11]:
#bloom filter
# Initialize a Bloom filter with 18 bits (all set to 0)
bit_array_size = 18
bit_array = [0] * bit_array_size

# Define the two hash functions
def hash1(x):
    return (x + 1) % 17

def hash2(x):
    return (3 * x + 2) % 17

# Insert a value into the Bloom filter
def insert(value):
    # Apply both hash functions and set the corresponding bits
    pos1 = hash1(value) % bit_array_size
    pos2 = hash2(value) % bit_array_size
    bit_array[pos1] = 1
    bit_array[pos2] = 1

# Values to be inserted
values = [15, 23, 65]

# Insert all values into the Bloom filter
for value in values:
    insert(value)

# Print the final state of the bit array
print("Final Bloom filter bit array:")
print(bit_array)

def is_probably_present(value):
    pos1 = hash1(value) % bit_array_size
    pos2 = hash2(value) % bit_array_size
    return bit_array[pos1] and bit_array[pos2]

# Check if number is in the filter
if is_probably_present(29):
    print("29 is seen.")
else:
    print("29 is not seen.")

Final Bloom filter bit array:
[0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0]
29 is not seen.


### Flajolet Martin

In [4]:
def flajolet_martin(stream):
  R = 5 # as 2^R = 32
  max_zeroes = 0
  for x in stream:
    hash_value = bin((3*x + 7) % 32)[2:].zfill(R) # [2:] is to remove "0b" and zfill pads with 0s
    trailing_zeroes = len(hash_value) - len(hash_value.rstrip('0'))
    max_zeroes = max(max_zeroes, trailing_zeroes)
  return 2**max_zeroes

# Example usage:
stream = [3, 1, 4, 1, 5, 9, 2, 6, 5]
estimated_distinct = flajolet_martin(stream)
print(f"Estimated number of distinct elements: {estimated_distinct}")

Estimated number of distinct elements: 16


### Min Hashing

In [None]:
import pandas as pd

def k_shingles(text, k):
    words = text.split()
    shingles = set()
    for i in range(len(words) - k + 1):
        shingle = ' '.join(words[i:i + k])
        shingles.add(shingle)
    return shingles

# Define the hash functions
def hash_function_1(x):
    return (3 * x + 5) % 526

def hash_function_2(x):
    return (7 * x + 4) % 526

def hash_function_3(x):
    return (3 * x + 1) % 526

# File paths
file_paths = [
    "one.txt",
    "two.txt",
    "three.txt"
]

# Token size
k = 5

# Dictionary to hold shingles for each file
shingle_dict = {}

# Process each file
for file_path in file_paths:
    with open(file_path, 'r') as file:
        content = file.read()

    shingles = k_shingles(content, k)
    shingle_dict[file_path] = shingles

# Create a set of all unique shingles across all files
all_shingles = list(set.union(*[set(shingles) for shingles in shingle_dict.values()]))

# Initialize an incidence matrix
incidence_matrix = pd.DataFrame(0, index=all_shingles, columns=file_paths)

# Populate the incidence matrix
for file_path, shingles in shingle_dict.items():
    for shingle in shingles:
        incidence_matrix.at[shingle, file_path] = 1

# Function to compute signatures
def compute_signature(incidence_matrix):
    signatures = {file_path: [] for file_path in incidence_matrix.columns}

    for shingle in incidence_matrix.index:
        # Use the index of the shingle in the all_shingles list
        index = list(incidence_matrix.index).index(shingle)

        for file_path in incidence_matrix.columns:
            # Calculate hash values
            hash_value = [
                hash_function_1(index),
                hash_function_2(index),
                hash_function_3(index)
            ]
            # Append hash values if the shingle is present in the file
            if incidence_matrix.at[shingle, file_path] == 1:
                signatures[file_path].append(hash_value)

    # Find the minimum hash value for each hash function per file
    final_signatures = {}
    for file_path, values in signatures.items():
        if values:  # If there are any values
            final_signatures[file_path] = [
                min(value[0] for value in values),  # Min for Hash1
                min(value[1] for value in values),  # Min for Hash2
                min(value[2] for value in values)   # Min for Hash3
            ]
        else:
            final_signatures[file_path] = [None, None, None]  # Handle case with no shingles

    return pd.DataFrame(final_signatures, index=['Hash1', 'Hash2', 'Hash3']).T

# Compute signatures
signature_matrix = compute_signature(incidence_matrix)

# Display the incidence matrix and signature matrix
print("Incidence Matrix:")
print(incidence_matrix)

print("\nSignature Matrix:")
print(signature_matrix)

In [1]:
import numpy as np

# Data matrix: each row represents an element's membership in the sets S1, S2, S3, S4.
data = np.array([
    [1, 0, 0, 1],  # a
    [0, 0, 1, 0],  # b
    [0, 1, 0, 1],  # c
    [1, 1, 0, 0],  # d
    [0, 1, 1, 0]   # e
])

# Define the hash functions
def h1(x):
    return (x + 1) % 5

def h2(x):
    return (3 * x + 1) % 5

# Number of hash functions and rows (elements)
num_hashes = 2
num_elements = data.shape[0]

# Initialize the signature matrix with infinity values
signature_matrix = np.full((num_hashes, data.shape[1]), np.inf)

# Compute the signature matrix
for row in range(num_elements):
    hashes = [h1(row), h2(row)]
    for col in range(data.shape[1]):
        if data[row, col] == 1:  # If the element is present in the set
            for i in range(num_hashes):
                signature_matrix[i, col] = min(signature_matrix[i, col], hashes[i])

# Display the resulting signature matrix
print("Signature Matrix:")
print(signature_matrix)

Signature Matrix:
[[1. 0. 0. 1.]
 [0. 0. 3. 1.]]


### KShingles

In [None]:
!pip install kshingle

In [2]:
from collections import deque

def create_k_shingles(filepath, k):
  shingles = []
  current_shingle = deque(maxlen=k)
  with open(filepath, 'r') as f:
    for line in f:
      tokens = line.strip().split()
      for token in tokens:
        current_shingle.append(token)
        if len(current_shingle) == k:
          shingles.append(tuple(current_shingle))
  return shingles

filepath = "one.txt"
k = 5
shingles = create_k_shingles(filepath, k)
shingles

[('Neural', 'networks,', 'or', 'artificial', 'neural'),
 ('networks,', 'or', 'artificial', 'neural', 'networks,'),
 ('or', 'artificial', 'neural', 'networks,', 'attempt'),
 ('artificial', 'neural', 'networks,', 'attempt', 'to'),
 ('neural', 'networks,', 'attempt', 'to', 'mimic'),
 ('networks,', 'attempt', 'to', 'mimic', 'the'),
 ('attempt', 'to', 'mimic', 'the', 'human'),
 ('to', 'mimic', 'the', 'human', 'brain'),
 ('mimic', 'the', 'human', 'brain', 'through'),
 ('the', 'human', 'brain', 'through', 'a'),
 ('human', 'brain', 'through', 'a', 'combination'),
 ('brain', 'through', 'a', 'combination', 'of'),
 ('through', 'a', 'combination', 'of', 'data'),
 ('a', 'combination', 'of', 'data', 'inputs,'),
 ('combination', 'of', 'data', 'inputs,', 'weights'),
 ('of', 'data', 'inputs,', 'weights', 'and'),
 ('data', 'inputs,', 'weights', 'and', 'biasâ€”all'),
 ('inputs,', 'weights', 'and', 'biasâ€”all', 'acting'),
 ('weights', 'and', 'biasâ€”all', 'acting', 'as'),
 ('and', 'biasâ€”all', 'acting',

### AMS

In [2]:
def ams_algorithm(stream, x_values):
    n = len(stream)
    total_sum = 0

    # Ensure x_values are within the range of the stream
    valid_x_values = [x for x in x_values if 0 <= x <= n]

    for index in valid_x_values:
        # Convert 1-based index to 0-based index
        zero_based_index = index - 1

        # Access the value at the given index in the stream
        x_j = stream[zero_based_index]

        # Count occurrences of x_j in the stream from position `zero_based_index` onward
        count = sum(1 for x in stream[zero_based_index:] if x == x_j)

        # Print the count for debugging purposes
        print(f"Value at position {index} (1-based) is {x_j} and its count from position onward is {count}")

        # Calculate the contribution of this x_j to the estimate
        estimate = n * (2 * count - 1)
        total_sum += estimate

    # Average the estimates from all x_values
    if len(valid_x_values) > 0:
        surprise_number = total_sum / len(valid_x_values)
    else:
        surprise_number = 0

    return surprise_number

# Example stream and x values
stream = [3, 1, 4, 1, 3, 4, 2, 1, 2]
x_values = [1, 3, 5]  # Given positions in the problem statement

# Calculate the surprise number
surprise_number = ams_algorithm(stream, x_values)

print(f"The surprise number of the stream is: {surprise_number}")

Value at position 1 (1-based) is 3 and its count from position onward is 2
Value at position 3 (1-based) is 4 and its count from position onward is 2
Value at position 5 (1-based) is 3 and its count from position onward is 1
The surprise number of the stream is: 21.0


### Bipartite Matching

In [None]:
!pip install networkx matplotlib

from collections import defaultdict

# Greedy algorithm for bipartite matching
class BipartiteMatcher:
    def __init__(self, U, V, edges):
        # U and V are the sets of vertices in the bipartite graph
        # edges is a list of tuples representing edges between U and V
        self.U = U
        self.V = V
        self.edges = edges
        self.matching = set()
        self.matched_U = set()
        self.matched_V = set()

    def greedy_match(self):
        for u, v in self.edges:
            # Check if both u and v are not already matched
            if u not in self.matched_U and v not in self.matched_V:
                # Add the edge to the matching
                self.matching.add((u, v))
                # Mark both vertices as matched
                self.matched_U.add(u)
                self.matched_V.add(v)

        return self.matching

U = {1, 2, 3, 4}  # Set U
V = {'a', 'b', 'c', 'd'}  # Set V

# List of edges between U and V
edges = [(1, 'a'), (2, 'b'), (3, 'a'), (4, 'c'), (2, 'd')]

matcher = BipartiteMatcher(U, V, edges)
matching = matcher.greedy_match()

print("Maximal Matching:", matching)

import networkx as nx
import matplotlib.pyplot as plt

# Assuming 'matching' is the output from your BipartiteMatcher
matching = [(1, 'a'), (2, 'b'), (4, 'c')]  # Replace with your actual matching

# Create a bipartite graph
G = nx.Graph()
G.add_nodes_from(U, bipartite=0)  # Add nodes from set U
G.add_nodes_from(V, bipartite=1)  # Add nodes from set V
G.add_edges_from(edges)  # Add all edges

# Create a subgraph with only the matched edges
matching_edges = list(matching)
G_matching = G.edge_subgraph(matching_edges)

# Draw the graph
pos = nx.bipartite_layout(G, U)  # Separate U and V nodes for better visualization
nx.draw(G, pos, with_labels=True, node_color=['lightblue' if node in U else 'lightgreen' for node in G.nodes()])
nx.draw_networkx_edges(G_matching, pos, edge_color='red', width=2)  # Highlight matching edges
plt.show()

### Social Network

In [None]:
pip install pandas networkx


import pandas as pd
import networkx as nx

# Load the dataset from a CSV file
# Assuming the CSV has two columns: 'source' and 'target' representing edges between nodes
csv_file = 'Social_Network_Ads.csv'
df = pd.read_csv(csv_file)
df.head()






# Create an empty graph
G = nx.Graph()

# Add edges from the CSV file to the graph
# Ensure that your CSV contains 'source' and 'target' columns
for index, row in df.iterrows():
    G.add_edge(row['User ID'], row['EstimatedSalary'])

# Calculate centrality measures
degree_centrality = nx.degree_centrality(G)
weighted_degree_centrality = {node: val * 100 for node, val in degree_centrality.items()}  # Placeholder for weighted degree
pagerank = nx.pagerank(G)
betweenness_centrality = nx.betweenness_centrality(G)

# Sort the centralities by degree as in your image example
degree_sorted = sorted(degree_centrality.items(), key=lambda x: x[1], reverse=True)
weighted_sorted = sorted(weighted_degree_centrality.items(), key=lambda x: x[1], reverse=True)
pagerank_sorted = sorted(pagerank.items(), key=lambda x: x[1], reverse=True)
betweenness_sorted = sorted(betweenness_centrality.items(), key=lambda x: x[1], reverse=True)

# Create a DataFrame to store centrality measures in a tabular format
data = {
    'Degree Name': [x[0] for x in degree_sorted],
    'Degree Score': [round(x[1]*100) for x in degree_sorted],  # Rounded for display
    'Weighted Degree Name': [x[0] for x in weighted_sorted],
    'Weighted Degree Score': [round(x[1]) for x in weighted_sorted],  # Assumed weighted values
    'Pagerank Name': [x[0] for x in pagerank_sorted],
    'Pagerank Score': [round(x[1], 3) for x in pagerank_sorted],  # 3 decimal places
    'Betweenness Name': [x[0] for x in betweenness_sorted],
    'Betweenness Score': [round(x[1], 2) for x in betweenness_sorted],  # 2 decimal places
}

# Create DataFrame
df_centrality = pd.DataFrame(data)

# Print the table
print(df_centrality.to_string(index=False))






import networkx as nx
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import matplotlib.pyplot as plt

# Load the dataset from the CSV file (update the file path accordingly)
file_path = 'Social_Network_Ads.csv'
df = pd.read_csv(file_path)

# Normalize 'EstimatedSalary' to compare users
df['EstimatedSalary_norm'] = (df['EstimatedSalary'] - df['EstimatedSalary'].mean()) / df['EstimatedSalary'].std()

# Create a feature matrix with EstimatedSalary
feature_matrix = df[['EstimatedSalary_norm']].values

# Calculate similarity between users using cosine similarity
similarity_matrix = cosine_similarity(feature_matrix)

# Create a Graph
G = nx.Graph()

# Add edges based on similarity (use a threshold to avoid connecting everyone)
threshold = 0.8  # Only consider strong similarities
for i, user1 in enumerate(df['User ID']):
    for j, user2 in enumerate(df['User ID']):
        if i != j and similarity_matrix[i, j] > threshold:
            G.add_edge(user1, user2, weight=similarity_matrix[i, j])

# Print basic graph info manually
print(f"Number of nodes: {G.number_of_nodes()}")
print(f"Number of edges: {G.number_of_edges()}")

# Degree statistics
degrees = dict(G.degree())
print(f"Average degree: {sum(degrees.values()) / len(degrees):.4f}")

# Subgraph for users with more than a certain number of connections
subG = G.subgraph([n for n in G.nodes if G.degree(n) > 2])

# Draw the subnetwork
pos = nx.spring_layout(subG, weight='weight', iterations=20, k=4)

plt.figure(figsize=(12, 10))
plt.axis('off')
plt.title('User Network based on Estimated Salary', fontsize=24)

# Draw nodes
node_sizes = [100 * subG.degree(node) ** 0.5 for node in subG.nodes()]
nx.draw_networkx_nodes(subG, pos, node_size=node_sizes, node_color='#009fe3')

# Draw edges with varying widths based on weight
for e in subG.edges(data=True):
    nx.draw_networkx_edges(subG, pos, edgelist=[e], width=e[2]['weight'] * 0.2, edge_color='#707070')  # Adjust width scaling if needed

# Add labels
nx.draw_networkx_labels(subG, pos, font_size=10)

# Show plot
plt.show()

### BFS

In [None]:
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np

# Load the Excel file
data = pd.read_csv('Movie.csv')

# Create a graph
G = nx.Graph()

# Add nodes and edges from the Excel data
for index, row in data.iterrows():
    person = row['person']
    movie = row['movie']  # Assuming your columns are named 'Name' and 'Movie'

    # Add nodes for person and movie
    G.add_node(person, type='person')
    G.add_node(movie, type='movie')

    # Add an edge between the person and the movie
    G.add_edge(person, movie)

# Color mapping based on node type
color_map = {'person': 'skyblue', 'movie': 'orange'}

# Assign colors
node_colors = [color_map[data['type']] if data['type'] in color_map else 'lightgray' for node, data in G.nodes(data=True)]

# Calculate node positions with a layout that spaces them out
pos = nx.spring_layout(G, k=0.3, iterations=20)  # Adjust k for more space between nodes

# Draw nodes with color
nx.draw_networkx_nodes(G, pos, node_color=node_colors, node_size=700, alpha=0.8)

# Draw edges
nx.draw_networkx_edges(G, pos, edge_color='gray', alpha=0.5)

# Labels
nx.draw_networkx_labels(G, pos, font_size=10, font_color='black')

# Display the graph
plt.figure(figsize=(15, 15))  # Adjust the figure size as necessary
plt.title('Network Graph of People and Movies')
plt.axis('off')  # Turn off the axis
plt.show()

### PipeLine

In [None]:
!pip install pyspark
!pip install findspark


import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","Smith","USA","CA"),("Michael","Rose","USA","NY"), \
    ("Robert","Williams","USA","CA"),("Maria","Jones","USA","FL") \
  ]
columns=["firstname","lastname","country","state"]
df=spark.createDataFrame(data=data,schema=columns)
df.show()
print(df.collect())

states1=df.rdd.map(lambda x: x[3]).collect()
print(states1)
#['CA', 'NY', 'CA', 'FL']
from collections import OrderedDict
res = list(OrderedDict.fromkeys(states1))
print(res)
#['CA', 'NY', 'FL']


#Example 2
states2=df.rdd.map(lambda x: x.state).collect()
print(states2)
#['CA', 'NY', 'CA', 'FL']

states3=df.select(df.state).collect()
print(states3)
#[Row(state='CA'), Row(state='NY'), Row(state='CA'), Row(state='FL')]

states4=df.select(df.state).rdd.flatMap(lambda x: x).collect()
print(states4)
#['CA', 'NY', 'CA', 'FL']

states5=df.select(df.state).toPandas()['state']
states6=list(states5)
print(states6)
#['CA', 'NY', 'CA', 'FL']

pandDF=df.select(df.state,df.firstname).toPandas()
print(list(pandDF['state']))
print(list(pandDF['firstname']))

### Diabetes

In [None]:
#Piplene - Diabetes

# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.types as tp

# Create a Spark session
spark = SparkSession.builder \
    .appName("DiabetesPredictionPipeline") \
    .getOrCreate()

data = pd.read_csv('diabetes.csv')
data
data_replaced = data.replace(0, np.nan)
data_replaced

# Read the CSV file
my_data = data_replaced

# Define the schema for the data
my_schema = tp.StructType([
    tp.StructField(name='Pregnancies', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Glucose', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='BloodPressure', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='SkinThickness', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Insulin', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='BMI', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='DiabetesPedigreeFunction', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='Age', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Outcome', dataType=tp.IntegerType(), nullable=True)
])

# Read the data again with the defined schema
my_data = spark.read.csv('diabetes.csv', schema=my_schema, header=True)

# Print the schema
my_data.printSchema()

# Define stages for the pipeline
imputer = Imputer(
    inputCols=my_data.columns,
    outputCols=["{}_imputed".format(c) for c in my_data.columns]
).setStrategy("median")

assembler = VectorAssembler(
    inputCols=['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness',
               'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'],
    outputCol='features'
)

lr = LogisticRegression(featuresCol='features', labelCol='Outcome', maxIter=10)

# Create the pipeline
pipeline = Pipeline(stages=[imputer, assembler, lr])

# Split the data into training and test sets
xtrain, xtest = my_data.randomSplit([0.7, 0.3])

# Fit the pipeline on training data
pipeline_model = pipeline.fit(xtrain)

# Make predictions on the test data
predictions = pipeline_model.transform(xtest)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()

### PySpark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Initialize Spark session
spark = SparkSession.builder.appName("CRUD Operations with TempView").getOrCreate()

# Manually creating a DataFrame using Row
data = [
    Row(ID=1, Name="Alice", Age=25),
    Row(ID=2, Name="Bob", Age=30),
    Row(ID=3, Name="Charlie", Age=35)
]

# Create DataFrame
df_manual = spark.createDataFrame(data)

# Create a temporary view for SQL queries
df_manual.createOrReplaceTempView("people")

# Select records where Age is greater than 25
filtered_records = spark.sql("SELECT * FROM people WHERE Age > 25")
filtered_records.show()

# Insert new records
new_data = [
    Row(ID=4, Name="David", Age=40),
    Row(ID=5, Name="Eva", Age=28)
]
df_new = spark.createDataFrame(new_data)
df_combined = df_manual.union(df_new)
df_combined.createOrReplaceTempView("people_combined")
spark.sql("SELECT * FROM people_combined").show()

# Update Age for the person with ID = 2
updated_query = """
SELECT ID, Name,
    CASE
        WHEN ID = 2 THEN 32
        ELSE Age
    END AS Age
FROM people_combined
"""
df_updated = spark.sql(updated_query)
df_updated.createOrReplaceTempView("people_updated")
spark.sql("SELECT * FROM people_updated").show()

# Delete records where ID = 1
df_deleted = spark.sql("SELECT * FROM people_updated WHERE ID != 1")
df_deleted.createOrReplaceTempView("people_updated")
spark.sql("SELECT * FROM people_updated").show()

In [None]:
import pyspark
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)

# Broadcast variable on filter

filteDf= df.where((df['state'].isin(broadcastStates.value)))

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType
# Create SparkSession
spark = SparkSession.builder \
          .appName('SparkByExamples.com') \
          .getOrCreate()

simpleData = [("James","34","true","M","3000.6089"),
    ("Michael","33","true","F","3300.8067"),
    ("Robert","37","false","M","5000.5034")
  ]

columns = ["firstname","age","isGraduated","gender","salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

from pyspark.sql.functions import col,round,expr
df.withColumn("salary",df.salary.cast('double')).printSchema()
df.withColumn("salary",df.salary.cast(DoubleType())).printSchema()
df.withColumn("salary",col("salary").cast('double')).printSchema()

#df.withColumn("salary",round(df.salary.cast(DoubleType()),2)).show(truncate=False).printSchema()
df.selectExpr("firstname","isGraduated","cast(salary as double) salary").printSchema()

df.createOrReplaceTempView("CastExample")
spark.sql("SELECT firstname,isGraduated,DOUBLE(salary) as salary from CastExample").printSchema()


#df.select("firstname",expr(df.age),"isGraduated",col("salary").cast('float').alias("salary")).show()

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

dataCollect = deptDF.collect()

print(dataCollect)

dataCollect2 = deptDF.select("dept_name").collect()
print(dataCollect2)

for row in dataCollect:
    print(row['dept_name'] + "," +str(row['dept_id']))

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data=[("James","Bond","100",None),
      ("Ann","Varsa","200",'F'),
      ("Tom Cruise","XXX","400",''),
      ("Tom Brand",None,"400",'M')]
columns=["fname","lname","id","gender"]
df=spark.createDataFrame(data,columns)

#alias
from pyspark.sql.functions import expr
df.select(df.fname.alias("first_name"), \
          df.lname.alias("last_name"), \
          expr(" fname ||','|| lname").alias("fullName") \
   ).show()

#asc, desc
df.sort(df.fname.asc()).show()
df.sort(df.fname.desc()).show()

#cast
df.select(df.fname,df.id.cast("int")).printSchema()

#between
df.filter(df.id.between(100,300)).show()

#contains
df.filter(df.fname.contains("Cruise")).show()

#startswith, endswith()
df.filter(df.fname.startswith("T")).show()
df.filter(df.fname.endswith("Cruise")).show()

#eqNullSafe

#isNull & isNotNull
df.filter(df.lname.isNull()).show()
df.filter(df.lname.isNotNull()).show()

#like , rlike
df.select(df.fname,df.lname,df.id) \
  .filter(df.fname.like("%om"))

#over

#substr
df.select(df.fname.substr(1,2).alias("substr")).show()

#when & otherwise
from pyspark.sql.functions import when
df.select(df.fname,df.lname,when(df.gender=="M","Male") \
              .when(df.gender=="F","Female") \
              .when(df.gender==None ,"") \
              .otherwise(df.gender).alias("new_gender") \
    ).show()

#isin
li=["100","200"]
df.select(df.fname,df.lname,df.id) \
  .filter(df.id.isin(li)) \
  .show()

from pyspark.sql.types import StructType,StructField,StringType,ArrayType,MapType
data=[(("James","Bond"),["Java","C#"],{'hair':'black','eye':'brown'}),
      (("Ann","Varsa"),[".NET","Python"],{'hair':'brown','eye':'black'}),
      (("Tom Cruise",""),["Python","Scala"],{'hair':'red','eye':'grey'}),
      (("Tom Brand",None),["Perl","Ruby"],{'hair':'black','eye':'blue'})]

schema = StructType([
        StructField('name', StructType([
            StructField('fname', StringType(), True),
            StructField('lname', StringType(), True)])),
        StructField('languages', ArrayType(StringType()),True),
        StructField('properties', MapType(StringType(),StringType()),True)
     ])
df=spark.createDataFrame(data,schema)
df.printSchema()
#getItem()
df.select(df.languages.getItem(1)).show()

df.select(df.properties.getItem("hair")).show()

#getField from Struct or Map
df.select(df.properties.getField("hair")).show()

df.select(df.name.getField("fname")).show()

#dropFields
#from pyspark.sql.functions import col
#df.withColumn("name1",col("name").dropFields(["fname"])).show()

#withField
#from pyspark.sql.functions import lit
#df.withColumn("name",df.name.withField("fname",lit("AA"))).show()

#from pyspark.sql import Row
#from pyspark.sql.functions import lit
#df = spark.createDataFrame([Row(a=Row(b=1, c=2))])
#df.withColumn('a', df['a'].withField('b', lit(3))).select('a.b').show()

#from pyspark.sql import Row
#from pyspark.sql.functions import col, lit
#df = spark.createDataFrame([
#Row(a=Row(b=1, c=2, d=3, e=Row(f=4, g=5, h=6)))])
#df.withColumn('a', df['a'].dropFields('b')).show()

In [None]:
from pyspark.sql import SparkSession,Row
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data=[("James",23),("Ann",40)]
df=spark.createDataFrame(data).toDF("name.fname","gender")
df.printSchema()
df.show()

from pyspark.sql.functions import col
df.select(col("`name.fname`")).show()
df.select(df["`name.fname`"]).show()
df.withColumn("new_col",col("`name.fname`").substr(1,2)).show()
df.filter(col("`name.fname`").startswith("J")).show()
new_cols=(column.replace('.', '_') for column in df.columns)
df2 = df.toDF(*new_cols)
df2.show()


# Using DataFrame object
df.select(df.gender).show()
df.select(df["gender"]).show()
#Accessing column name with dot (with backticks)
df.select(df["`name.fname`"]).show()

#Using SQL col() function
from pyspark.sql.functions import col
df.select(col("gender")).show()
#Accessing column name with dot (with backticks)
df.select(col("`name.fname`")).show()

#Access struct column
data=[Row(name="James",prop=Row(hair="black",eye="blue")),
      Row(name="Ann",prop=Row(hair="grey",eye="black"))]
df=spark.createDataFrame(data)
df.printSchema()

df.select(df.prop.hair).show()
df.select(df["prop.hair"]).show()
df.select(col("prop.hair")).show()
df.select(col("prop.*")).show()

# Column operators
data=[(100,2,1),(200,3,4),(300,4,4)]
df=spark.createDataFrame(data).toDF("col1","col2","col3")
df.select(df.col1 + df.col2).show()
df.select(df.col1 - df.col2).show()
df.select(df.col1 * df.col2).show()
df.select(df.col1 / df.col2).show()
df.select(df.col1 % df.col2).show()

df.select(df.col2 > df.col3).show()
df.select(df.col2 < df.col3).show()
df.select(df.col2 == df.col3).show()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

dataDictionary = [
        ('James',{'hair':'black','eye':'brown'}),
        ('Michael',{'hair':'brown','eye':None}),
        ('Robert',{'hair':'red','eye':'black'}),
        ('Washington',{'hair':'grey','eye':'grey'}),
        ('Jefferson',{'hair':'brown','eye':''})
        ]

df = spark.createDataFrame(data=dataDictionary, schema = ['name','properties'])
df.printSchema()
df.show(truncate=False)

df3=df.rdd.map(lambda x: \
    (x.name,x.properties["hair"],x.properties["eye"])) \
    .toDF(["name","hair","eye"])
df3.printSchema()
df3.show()

df.withColumn("hair",df.properties.getItem("hair")) \
  .withColumn("eye",df.properties.getItem("eye")) \
  .drop("properties") \
  .show()

df.withColumn("hair",df.properties["hair"]) \
  .withColumn("eye",df.properties["eye"]) \
  .drop("properties") \
  .show()

# Functions
from pyspark.sql.functions import explode,map_keys,col
keysDF = df.select(explode(map_keys(df.properties))).distinct()
keysList = keysDF.rdd.map(lambda x:x[0]).collect()
keyCols = list(map(lambda x: col("properties").getItem(x).alias(str(x)), keysList))
df.select(df.name, *keyCols).show()

In [None]:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

#Using List
dept = [("Finance",10),
        ("Marketing",20),
        ("Sales",30)
      ]

deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

deptSchema = StructType([
    StructField('firstname', StringType()),
    StructField('middlename', StringType())
])

deptDF1 = spark.createDataFrame(data=dept, schema = deptSchema)
deptDF1.printSchema()
deptDF1.show(truncate=False)

# Using list of Row type
dept2 = [Row("Finance",10),
        Row("Marketing",20),
        Row("Sales",30),
        Row("IT",40)
      ]

deptDF2 = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF2.printSchema()
deptDF2.show(truncate=False)

# Convert list to RDD
rdd = spark.sparkContext.parallelize(dept)

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com') \
        .master("local[5]").getOrCreate()

df=spark.range(0,20)
print(df.rdd.getNumPartitions())

df.write.mode("overwrite").csv("c:/tmp/partition.csv")

df2 = df.repartition(6)
print(df2.rdd.getNumPartitions())

df3 = df.coalesce(2)
print(df3.rdd.getNumPartitions())

df4 = df.groupBy("id").count()
print(df4.rdd.getNumPartitions())

In [None]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
               .appName('SparkByExamples.com') \
               .getOrCreate()

from pyspark.sql.functions import *

df=spark.createDataFrame([["1"]],["id"])
df.select(current_date().alias("current_date"), \
      date_format(current_date(),"yyyy MM dd").alias("yyyy MM dd"), \
      date_format(current_timestamp(),"MM/dd/yyyy hh:mm").alias("MM/dd/yyyy"), \
      date_format(current_timestamp(),"yyyy MMM dd").alias("yyyy MMMM dd"), \
      date_format(current_timestamp(),"yyyy MMMM dd E").alias("yyyy MMMM dd E") \
   ).show()

#SQL

spark.sql("select current_date() as current_date, "+
      "date_format(current_timestamp(),'yyyy MM dd') as yyyy_MM_dd, "+
      "date_format(current_timestamp(),'MM/dd/yyyy hh:mm') as MM_dd_yyyy, "+
      "date_format(current_timestamp(),'yyyy MMM dd') as yyyy_MMMM_dd, "+
      "date_format(current_timestamp(),'yyyy MMMM dd E') as yyyy_MMMM_dd_E").show()

In [None]:
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
               .appName('SparkByExamples.com') \
               .getOrCreate()
data=[["1","2020-02-01"],["2","2019-03-01"],["3","2021-03-01"]]
df=spark.createDataFrame(data,["id","input"])
df.show()

from pyspark.sql.functions import *

#current_date()
df.select(current_date().alias("current_date")
  ).show(1)

#date_format()
df.select(col("input"),
    date_format(col("input"), "MM-dd-yyyy").alias("date_format")
  ).show()

#to_date()
df.select(col("input"),
    to_date(col("input"), "yyy-MM-dd").alias("to_date")
  ).show()

#datediff()
df.select(col("input"),
    datediff(current_date(),col("input")).alias("datediff")
  ).show()

#months_between()
df.select(col("input"),
    months_between(current_date(),col("input")).alias("months_between")
  ).show()

#trunc()
df.select(col("input"),
    trunc(col("input"),"Month").alias("Month_Trunc"),
    trunc(col("input"),"Year").alias("Month_Year"),
    trunc(col("input"),"Month").alias("Month_Trunc")
   ).show()

#add_months() , date_add(), date_sub()

df.select(col("input"),
    add_months(col("input"),3).alias("add_months"),
    add_months(col("input"),-3).alias("sub_months"),
    date_add(col("input"),4).alias("date_add"),
    date_sub(col("input"),4).alias("date_sub")
  ).show()

#

df.select(col("input"),
     year(col("input")).alias("year"),
     month(col("input")).alias("month"),
     next_day(col("input"),"Sunday").alias("next_day"),
     weekofyear(col("input")).alias("weekofyear")
  ).show()

df.select(col("input"),
     dayofweek(col("input")).alias("dayofweek"),
     dayofmonth(col("input")).alias("dayofmonth"),
     dayofyear(col("input")).alias("dayofyear"),
  ).show()

data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

#current_timestamp()
df2.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)

#to_timestamp()
df2.select(col("input"),
    to_timestamp(col("input"), "MM-dd-yyyy HH mm ss SSS").alias("to_timestamp")
  ).show(truncate=False)


#hour, minute,second
data=[["1","2020-02-01 11:01:19.06"],["2","2019-03-01 12:01:19.406"],["3","2021-03-01 12:01:19.406"]]
df3=spark.createDataFrame(data,["id","input"])

df3.select(col("input"),
    hour(col("input")).alias("hour"),
    minute(col("input")).alias("minute"),
    second(col("input")).alias("second")
  ).show(truncate=False)

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
simpleData = (("James","","Smith","36636","NewYork",3100), \
    ("Michael","Rose","","40288","California",4300), \
    ("Robert","","Williams","42114","Florida",1400), \
    ("Maria","Anne","Jones","39192","Florida",5500), \
    ("Jen","Mary","Brown","34561","NewYork",3000) \
  )
columns= ["firstname","middlename","lastname","id","location","salary"]

df = spark.createDataFrame(data = simpleData, schema = columns)

df.printSchema()
df.show(truncate=False)

df.drop("firstname") \
  .printSchema()

df.drop(col("firstname")) \
  .printSchema()

df.drop(df.firstname) \
  .printSchema()

df.drop("firstname","middlename","lastname") \
    .printSchema()

cols = ("firstname","middlename","lastname")

df.drop(*cols) \
   .printSchema()

In [None]:
import os
# Set the SPARK_HOME environment variable
os.environ['SPARK_HOME'] = '/path/to/your/spark/installation'
# Example: os.environ['SPARK_HOME'] = '/opt/spark'

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","Smith","USA","CA"),("Michael","Rose","USA","NY"), \
    ("Robert","Williams","USA","CA"),("Maria","Jones","USA","FL") \
  ]
columns=["firstname","lastname","country","state"]
df=spark.createDataFrame(data=data,schema=columns)
df.show()
print(df.collect())

states1=df.rdd.map(lambda x: x[3]).collect()
print(states1)
#['CA', 'NY', 'CA', 'FL']
from collections import OrderedDict
res = list(OrderedDict.fromkeys(states1))
print(res)
#['CA', 'NY', 'FL']


#Example 2
states2=df.rdd.map(lambda x: x.state).collect()
print(states2)
#['CA', 'NY', 'CA', 'FL']

states3=df.select(df.state).collect()
print(states3)
#[Row(state='CA'), Row(state='NY'), Row(state='CA'), Row(state='FL')]

states4=df.select(df.state).rdd.flatMap(lambda x: x).collect()
print(states4)
#['CA', 'NY', 'CA', 'FL']

states5=df.select(df.state).toPandas()['state']
states6=list(states5)
print(states6)
#['CA', 'NY', 'CA', 'FL']

pandDF=df.select(df.state,df.firstname).toPandas()
print(list(pandDF['state']))
print(list(pandDF['firstname']))

In [None]:

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","Smith","USA","CA"),("Michael","Rose","USA","NY"), \
    ("Robert","Williams","USA","CA"),("Maria","Jones","USA","FL") \
  ]
columns=["firstname","lastname","country","state"]
df=spark.createDataFrame(data=data,schema=columns)
df.show()
print(df.collect())

states1=df.rdd.map(lambda x: x[3]).collect()
print(states1)
#['CA', 'NY', 'CA', 'FL']
from collections import OrderedDict
res = list(OrderedDict.fromkeys(states1))
print(res)
#['CA', 'NY', 'FL']


#Example 2
states2=df.rdd.map(lambda x: x.state).collect()
print(states2)
#['CA', 'NY', 'CA', 'FL']

states3=df.select(df.state).collect()
print(states3)
#[Row(state='CA'), Row(state='NY'), Row(state='CA'), Row(state='FL')]

states4=df.select(df.state).rdd.flatMap(lambda x: x).collect()
print(states4)
#['CA', 'NY', 'CA', 'FL']

states5=df.select(df.state).toPandas()['state']
states6=list(states5)
print(states6)
#['CA', 'NY', 'CA', 'FL']

pandDF=df.select(df.state,df.firstname).toPandas()
print(list(pandDF['state']))
print(list(pandDF['firstname']))

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark: SparkSession = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

data = [
    ("James",None,"M"),
    ("Anna","NY","F"),
    ("Julia",None,None)
]

columns = ["name","state","gender"]
df =spark.createDataFrame(data,columns)

df.printSchema()
df.show()

df.filter("state is NULL").show()
df.filter(df.state.isNull()).show()
df.filter(col("state").isNull()).show()

df.filter("state IS NULL AND gender IS NULL").show()
df.filter(df.state.isNull() & df.gender.isNull()).show()

df.filter("state is not NULL").show()
df.filter("NOT state is NULL").show()
df.filter(df.state.isNotNull()).show()
df.filter(col("state").isNotNull()).show()
df.na.drop(subset=["state"]).show()

df.createOrReplaceTempView("DATA")
spark.sql("SELECT * FROM DATA where STATE IS NULL").show()
spark.sql("SELECT * FROM DATA where STATE IS NULL AND GENDER IS NULL").show()
spark.sql("SELECT * FROM DATA where STATE IS NOT NULL").show()

### Calculating nth moment

In [10]:
class StreamMoment:
    def __init__(self, n):
        self.n = n      # n-th moment
        self.count = 0   # Count of elements
        self.mean = 0    # Running mean
        self.moment_n = 0 # n-th moment

    def update(self, value):
        # Increment count of elements
        self.count += 1

        # Compute new mean
        delta = value - self.mean
        self.mean += delta / self.count

        # Update the n-th moment
        self.moment_n += (delta ** self.n) * ((self.count - 1) / self.count)

    def get_moment(self):
        # Return the n-th moment divided by the number of elements
        return self.moment_n / self.count if self.count > 0 else 0

# n-th moment:
stream_moment = StreamMoment(n=1)

# Simulating a stream of data
data_stream = [1, 0, 0, 1, 1, 1, 0]
for data in data_stream:
    stream_moment.update(data)

# Get the n-th moment
print(f"Moment: {stream_moment.get_moment()}")

Moment: -0.024489795918367325


### PCY

In [1]:
from collections import defaultdict
from itertools import combinations
import hashlib
def hash_pair(pair, num_buckets):
    return int(hashlib.md5(str(pair).encode()).hexdigest(), 16) % num_buckets
def count_items(transactions):
    item_count = defaultdict(int)
    for transaction in transactions:
        for item in transaction:
            item_count[item] += 1
    return item_count
def count_pairs(transactions, frequent_items, num_buckets):
    bucket = [0] * num_buckets
    pair_count = defaultdict(int)

    for transaction in transactions:
        transaction = [item for item in transaction if item in frequent_items]
        for pair in combinations(transaction, 2):
            bucket[hash_pair(pair, num_buckets)] += 1

    for transaction in transactions:
        transaction = [item for item in transaction if item in frequent_items]
        for pair in combinations(transaction, 2):
            if bucket[hash_pair(pair, num_buckets)] >= 2:  # If the bucket is frequent
                pair_count[pair] += 1

    return pair_count
def pcy_algorithm(transactions, min_support, num_buckets):
    # First pass: count individual items and hash pairs into buckets
    item_count = count_items(transactions)
    frequent_items = {item for item, count in item_count.items() if count >= min_support}

    # Second pass: count pairs using the frequent items and buckets
    pair_count = count_pairs(transactions, frequent_items, num_buckets)
    frequent_pairs = {pair for pair, count in pair_count.items() if count >= min_support}

    return frequent_items, frequent_pairs
transactions = [
    ['1','2','3'],
    ['4', '5'],
    ['1','4', '5'],
    ['1', '2', '4'],
    ['3','4' ,'5'],
    ['2','4','5']
]

min_support = 2
num_buckets = 5

frequent_items, frequent_pairs = pcy_algorithm(transactions, min_support, num_buckets)

print("Frequent Items:", frequent_items)
print("Frequent Pairs:", frequent_pairs)

Frequent Items: {'4', '2', '1', '5', '3'}
Frequent Pairs: {('2', '4'), ('4', '5'), ('1', '2'), ('1', '4')}


In [None]:
from itertools import combinations
from collections import defaultdict, Counter
import pandas as pd

def hash_function(itemset, size):
    """ Simple hash function to distribute itemsets over a hash table. """
    return hash(itemset) % size

def get_frequent_itemsets(transactions, min_support, hash_size):
    """ Generate frequent itemsets using the PCY algorithm. """
    # First pass: Count 1-itemsets and hash pairs
    item_counts = Counter()
    pair_counts = defaultdict(int)
    hash_table = [0] * hash_size

    # Count 1-itemsets and hash pairs
    for transaction in transactions:
        # Convert items to strings for consistent comparison
        transaction = [str(item) for item in transaction]
        for item in transaction:
            item_counts[item] += 1
        for itemset in combinations(sorted(transaction), 2): # Now sorting will work correctly
            pair_counts[itemset] += 1
            hash_index = hash_function(itemset, hash_size)
            hash_table[hash_index] += 1

    # Filter frequent 1-itemsets
    frequent_items = {item for item, count in item_counts.items() if count >= min_support}

    # Second pass: Use hash table to count candidate 2-itemsets
    frequent_pairs = defaultdict(int)
    for transaction in transactions:
        # Convert items to strings for consistent comparison
        transaction_items = [str(item) for item in transaction if str(item) in frequent_items]
        for itemset in combinations(sorted(transaction_items), 2):
            hash_index = hash_function(itemset, hash_size)
            if hash_table[hash_index] >= min_support:
                frequent_pairs[itemset] += 1

    # Filter frequent 2-itemsets
    frequent_pairs = {itemset: count for itemset, count in frequent_pairs.items() if count >= min_support}

    return frequent_items, frequent_pairs

# Example usage
if __name__ == "__main__":
    transactions = pd.read_csv("/content/trans.csv")
    transactions = transactions.values.tolist()

    min_support = 2
    hash_size = 10  # Size of hash table

    frequent_items, frequent_pairs = get_frequent_itemsets(transactions, min_support, hash_size)

    print("Frequent Itemsets:", frequent_items)
    print("Frequent Pairs:", frequent_pairs)

### Decaying Window

In [None]:
def decaying_window(tags, alpha):
    # Initialize an empty dictionary to store decaying counts
    decaying_counts = {}

    for tag in tags:
        # Update decaying counts for all tags
        for key in decaying_counts.keys():
            decaying_counts[key] *= (1 - alpha)

        # Increment the count for the current tag
        if tag in decaying_counts:
            decaying_counts[tag] += 1
        else:
            decaying_counts[tag] = 1

        # Print the decaying counts after processing each tag
        print(f"After '{tag}': {decaying_counts}")

    return decaying_counts

# Example sequence of Twitter tags
tags = ["fifa", "ipl", "fifa", "ipl", "ipl", "ipl", "fifa"]
alpha = 0.1  # Decay factor

# Calculate decaying counts
final_decaying_counts = decaying_window(tags, alpha)

# Final decaying counts after processing all tags
print("\nFinal Decaying Counts:", final_decaying_counts)