In [None]:
import pandas as pd
import random
import string
from datetime import datetime, timedelta

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, LongType,
    FloatType, DoubleType, BooleanType, DateType, TimestampType, ArrayType
)
# Initialize SparkSession
# spark = SparkSession.builder \
#     .appName("ExcelMetadataWorkflow") \
#     .getOrCreate()

In [None]:
!pip install openpyxl

In [None]:
# -------------------------------------
# Step 1. Read all sheets from the Excel file.
# -------------------------------------
excel_path = "/tmp/HPE_NVDA_datagen.xlsx"  # update this path

# Read every sheet into a dictionary: keys are sheet names, values are DataFrames.
sheets = pd.read_excel(excel_path, sheet_name=None)
# sheets = spark.read.  
sheet_names = list(sheets.keys())
print("Found sheets:", sheet_names)

In [None]:
# -------------------------------------
# Step 2. Process the tables overview (first sheet)
# -------------------------------------
# Assumption: The first sheet (e.g. "Tables") lists the table names and approximate row counts.
tables_overview_df = sheets[sheet_names[0]]
# Adjust these column names if your Excel file uses different names.
table_names = tables_overview_df["masked_table_id"].tolist()
approx_row_counts = tables_overview_df["num_rows_approx"].tolist()

print("Tables and approximate row counts:")
for tbl, cnt in zip(table_names, approx_row_counts):
    print(f"  {tbl}: ~{cnt} rows")


In [None]:
# -------------------------------------
# Step 3. Read each table's metadata (columns, types, etc.)
# -------------------------------------
# Here we assume that the sheet name for each table is the same as the table name.
table_metadata = {}
for tbl in table_names:
    if tbl in sheets:
        meta_df = sheets[tbl]
        table_metadata[tbl] = meta_df
        print(f"Loaded metadata for table '{tbl}'.")
    else:
        print(f"Warning: No metadata sheet found for table '{tbl}'.")

In [None]:
# -------------------------------------
# Step 4. Define a mapping from your Excel type names to Spark types.
# -------------------------------------
spark_type_mapping = {
    "StringType()": StringType(),
    "StringType": StringType(),
    "IntegerType()": IntegerType(),
    "IntegerType()": IntegerType(),
    "LongType()": LongType(),
    "FloatType()": FloatType(),
    "DoubleType()": DoubleType(),
    "BooleanType()": BooleanType(),
    "BooleanType()": BooleanType(),
    "DateType()": DateType(),
    "TimestampType()": TimestampType(),
    "ArrayType(IntegerType(), True)": ArrayType(IntegerType(), True),
    "ArrayType(StringType(), True)": ArrayType(StringType(), True)
}

def create_schema(meta_df):
    """
    Create a Spark schema (StructType) from the metadata DataFrame.
    For numerical types, if "min" and "max" are provided, they are stored in the field metadata.
    This version ensures that the type from the spreadsheet is used (if it matches).
    """
    fields = []
    # Ensure that the range columns exist in the DataFrame.
    has_range = ("min" in meta_df.columns) and ("max" in meta_df.columns)
    
    for idx, row in meta_df.iterrows():
        col_name = row["masked_column_name"]
        # Convert the Type from the spreadsheet to a lower-case string.
        type_str = str(row["spark_data_type"]).strip() if pd.notna(row["spark_data_type"]) else "string"
        spark_type = spark_type_mapping.get(type_str)
        
        if spark_type is None:
            # If the type is not recognized, warn and default to StringType.
            print(f"Warning: Unrecognized type '{row['spark_data_type']}' for column '{col_name}'. Using StringType.")
            spark_type = StringType()
        
        md = {}
        # For numerical types, if min and max values are provided, store them in metadata.
        if isinstance(spark_type, (IntegerType, LongType, FloatType, DoubleType)) and has_range:
            if pd.notna(row["min"]) and pd.notna(row["max"]):
                md["min"] = row["min"]
                md["max"] = row["max"]
        
        fields.append(StructField(col_name, spark_type, True, metadata=md))
    
    return StructType(fields)

# Create a dictionary of schemas for each table.
schemas = {}
for tbl, meta_df in table_metadata.items():
    schema = create_schema(meta_df)
    schemas[tbl] = schema
    print(f"Schema for table '{tbl}': {schema}")

In [None]:
# -------------------------------------
# Step 5. Process join information.
# -------------------------------------
# Assumption: The final sheet (last sheet) is named "Joins" and holds the join definitions.
join_info_df = sheets[sheet_names[1]]
joins = []
# Here we assume join_info_df has columns: "LeftTable", "LeftColumn", "RightTable", "RightColumn", and optionally "JoinType"
for idx, row in join_info_df.iterrows():
    join_detail = {
        "left_table": row["table1"],
        "right_table": row["table2"],
        "join_method": row["join_method"],
        "left_column": row["column1"],
        "right_column": row["column2"]
    }
    joins.append(join_detail)

print("Join definitions:")
for join in joins:
    print(f"  {join['left_table']}.{join['left_column']} {join['join_method'].upper()} JOIN {join['right_table']}.{join['right_column']}")


In [None]:
# -------------------------------------
# (Optional) Step 6. Build a dynamic join query.
# -------------------------------------
# If you later load your data into Spark DataFrames and register them as temporary views,
# you could build and execute a join query dynamically. For example, suppose you have:
#    df_customers = spark.read.csv("customers.csv", schema=schemas["customers"], header=True)
#    df_customers.createOrReplaceTempView("customers")
#    ... and similarly for other tables.
#
# The code below builds a join SQL string assuming sequential joining.
if table_names:
    join_query = f"SELECT * FROM {table_names[0]}"
    for join in joins:
        # Note: This simple logic assumes that the join order is appropriate.
        join_query += (
            f" {join['join_method'].upper()} JOIN {join['right_table']} "
            f"ON {join['left_table']}.{join['left_column']} = {join['right_table']}.{join['right_column']}"
        )
    print("Constructed join query:")
    print(join_query)
    # To execute the query once the tables are registered as temp views:
    # result_df = spark.sql(join_query)
    # result_df.show()

# -------------------------------------
# Now you have:
#  - 'table_names' and 'approx_row_counts' from the overview.
#  - 'table_metadata': a dictionary mapping table names to their metadata DataFrames.
#  - 'schemas': a dictionary mapping table names to Spark schemas.
#  - 'joins': a list of dictionaries describing the join relationships.
#
# You can now use this information to drive your Spark ETL/processing workflow.
#
# When finished, stop the Spark session (if running in a script).
# spark.stop()


# Actually Generate Data

In [None]:
# ========================================
# PART 2: Generate random data for each table and register as temp views
# ========================================

def generate_random_dataframe(schema, num_rows):
    """
    Given a Spark StructType schema and a number of rows, generate a DataFrame with random data
    using Spark’s distributed operations.
    For numerical types, if metadata has "min" and "max", those bounds are used.
    """
    # Start with a DataFrame with a column "id" (this DataFrame is generated in a distributed fashion)
    df = spark.range(num_rows)
    
    # For each field in the schema, add a column with a random value.
    for field in schema.fields:
        col_name = field.name
        dt = field.dataType
        md = field.metadata or {}
        
        if isinstance(dt, (IntegerType, LongType)):
            # Use provided min and max if available; otherwise default to 1 and 1000.
            min_val = md.get("min", 1)
            max_val = md.get("max", 1000)
            expr = (F.rand() * (float(max_val) - float(min_val)) + float(min_val))
            # Cast appropriately.
            if isinstance(dt, IntegerType):
                df = df.withColumn(col_name, expr.cast("int"))
            else:
                df = df.withColumn(col_name, expr.cast("long"))
                
        elif isinstance(dt, (FloatType, DoubleType)):
            min_val = md.get("min", 0.0)
            max_val = md.get("max", 1000.0)
            expr = (F.rand() * (float(max_val) - float(min_val)) + float(min_val))
            if isinstance(dt, FloatType):
                df = df.withColumn(col_name, expr.cast("float"))
            else:
                df = df.withColumn(col_name, expr.cast("double"))
                
        elif isinstance(dt, BooleanType):
            # Generate a boolean value based on a threshold.
            df = df.withColumn(col_name, F.rand() > 0.5)
            
        elif isinstance(dt, DateType):
            # Generate a random date by adding a random number of days (e.g., 0 to 9000) to a base date.
            df = df.withColumn(col_name, F.expr("date_add('2000-01-01', cast(rand() * 9000 as int))"))
            
        elif isinstance(dt, TimestampType):
            # Generate a random timestamp by first generating a random date and then converting it.
            df = df.withColumn(col_name, F.expr("to_timestamp(date_add('2000-01-01', cast(rand() * 9000 as int)))"))
            
        elif isinstance(dt, StringType):
            # Use the built-in uuid() function for random strings.
            df = df.withColumn(col_name, F.expr("uuid()"))
            
        else:
            # For any unrecognized type, set the column to null.
            df = df.withColumn(col_name, F.lit(None))
            
    # Drop the original "id" column.
    return df.drop("id")

# Create and register a DataFrame for each table using the distributed random data generation.
# NOTE: THIS WAS SCALED DOWN FOR TESTING PURPOSES. UNCOMMENT LINE 74 AND COMMENT OUT LINES 68-73 FOR REAL TESTING
dfs = {}
for tbl, count in zip(table_names, approx_row_counts):
    schema = schemas[tbl]
    if tbl == 'table_a':
        num_rows = 100000000
    elif tbl == 'table_c':
        num_rows = 21000000
    else:
        num_rows = int(count)
    # num_rows = int(count)
    df = generate_random_dataframe(schema, num_rows)
    dfs[tbl] = df
    print(f"Created DataFrame for table '{tbl}' with {num_rows} random rows.")

# GroupBys

In [None]:
table_a = dfs['table_a']
gb_test1 = table_a.groupBy(['col_a_1', 'col_a_3', 'col_a_5', 'col_a_7', 'col_a_9']).count()

gb_test1.write.format("noop").mode("overwrite").save()

In [None]:
table_a = dfs['table_a']
gb_test2 = table_a.groupBy(['col_a_1']).count()

gb_test2.write.format("noop").mode("overwrite").save()

# Joins

### table_c `col_1` and `col_3` are both ArrayType(IntegerType) columns in real life, most of the time the array is one value long, but sometimes it's multiple values. Try and figure this out at home.

In [None]:
table_a = dfs['table_a']
table_b = dfs['table_b']

join_test1 = table_a.join(table_b, [
    table_a["col_a_1"]==table_b["col_b_8"],
    table_a["col_a_3"]==table_b["col_b_3"],
    table_a["col_a_5"]==table_b["col_b_9"],
    table_a["col_a_7"]==table_b["col_b_1"],
],
how='left')
join_test1.write.format("noop").mode("overwrite").save()

In [None]:
table_a = dfs['table_a']
table_c = dfs['table_c']

join_test2 = table_a.join(table_c, [
    table_a["col_a_1"]==table_c["col_c_10"],
    table_a["col_a_3"]==table_c["col_c_9"],
    table_a["col_a_9"]==table_c["col_c_11"],
],
how='left')

join_test2.write.format("noop").mode("overwrite").save()

In [None]:
table_a = dfs['table_a']
table_d = dfs['table_d']

join_test3 = table_a.join(table_d, [
    table_a["col_a_1"]==table_d["col_d_0"],
    table_a["col_a_5"]==table_d["col_d_1"],
],
how='left')

join_test3.write.format("noop").mode("overwrite").save()

In [None]:
table_a = dfs['table_a']
table_e = dfs['table_e']

join_test4 = table_a.join(table_e, table_a["col_a_1"]==table_e["col_e_0"], how='left')

join_test4.write.format("noop").mode("overwrite").save()

In [None]:
table_c = dfs['table_c']
table_e = dfs['table_e']

join_test5 = table_c.join(table_e, table_c["col_c_5"]==table_e["col_e_0"], how='left')

join_test5.write.format("noop").mode("overwrite").save()

In [None]:
table_c = dfs['table_c']
table_e = dfs['table_e']

join_test6 = table_c.join(table_e, table_c["col_c_10"]==table_e["col_e_0"], how='left')

join_test6.write.format("noop").mode("overwrite").save()

In [None]:
linked_join_test = (
    table_a
    .join(
        table_b,
        [
            table_a["col_a_1"] == table_b["col_b_8"],
            table_a["col_a_3"] == table_b["col_b_3"],
            table_a["col_a_5"] == table_b["col_b_9"],
            table_a["col_a_7"] == table_b["col_b_1"],
        ],
        how="left"
    )
    .join(
        table_c,
        [
            table_a["col_a_1"] == table_c["col_c_10"],
            table_a["col_a_3"] == table_c["col_c_9"],
            table_a["col_a_9"] == table_c["col_c_11"],
        ],
        how="left"
    )
    .join(
        table_d,
        [
            table_a["col_a_1"] == table_d["col_d_0"],
            table_a["col_a_5"] == table_d["col_d_1"],
        ],
        how="left"
    )
    .join(
        table_e,
        table_a["col_a_1"] == table_e["col_e_0"],
        how="left"
    )
)

linked_join_test.write.format("noop").mode("overwrite").save()

# Breadth First Search

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create or get your Spark session
# spark = SparkSession.builder.getOrCreate()

# Assume your input DataFrame 'df' has columns "col_0" and "col_1".
# We create an 'edges' DataFrame with "src" and "dst" columns.
df = dfs['table_a'].limit(100000)

edges = df.select(F.col("col_a_3").alias("src"), F.col("col_a_7").alias("dst"))

# Define the BFS starting point.
# Change the 'source' variable to the vertex from which you want to start the BFS.
source = 1000  # For example, use "A" as the starting vertex

# Create the initial frontier: the source vertex with distance 0.
frontier = spark.createDataFrame([(source, 0)], ["vertex", "distance"])

# Create a DataFrame to keep track of all visited vertices (and their distance from the source).
visited = frontier

# Loop until there are no new nodes to visit.
while frontier.count() > 0:
    # 1. Find neighbors: join the current frontier with the edges DataFrame.
    #    Each neighbor gets a distance equal to (current distance + 1).
    new_neighbors = frontier.join(edges, frontier.vertex == edges.src) \
                            .select(edges.dst.alias("vertex"),
                                    (frontier.distance + 1).alias("distance"))
    
    # 2. Exclude vertices that have already been visited.
    new_neighbors = new_neighbors.join(visited, on="vertex", how="left_anti").distinct()
    
    # 3. If no new vertices are found, exit the loop.
    if new_neighbors.count() == 0:
        break
    
    # 4. Add the new neighbors to the visited set.
    visited = visited.union(new_neighbors).distinct()
    
    # 5. Update the frontier to be the new neighbors.
    frontier = new_neighbors

# The 'visited' DataFrame now contains all vertices reachable from the source,
# along with the minimum number of steps (distance) from the source.
visited.write.format("noop").mode("overwrite").save()

# PageRank

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create or get your Spark session
# spark = SparkSession.builder.getOrCreate()

table_a = dfs['table_a']
df = table_a.limit(10000) # Change as needed

# Set the reset (teleportation) probability and the number of iterations
alpha = 0.15
maxIter = 10

# Choose the personalized seed: take the first value from col_a_3
seed = df.select("col_a_3").first()[0]

# 1. Create the vertices DataFrame: union of unique IDs from col_a_3 and col_a_7.
vertices = (
    df.select(F.col("col_a_3").alias("id"))
      .union(df.select(F.col("col_a_7").alias("id")))
      .distinct()
)

# 2. Create the edges DataFrame: define edge from col_a_3 to col_a_7.
edges = df.select(F.col("col_a_3").alias("src"), F.col("col_a_7").alias("dst"))

# 3. Compute out-degrees: count of outgoing edges for each source vertex.
out_degrees = edges.groupBy("src").agg(F.count("*").alias("out_degree"))

# 4. Initialize each vertex with a PageRank value:
#    The seed gets 1.0 and all others start with 0.0.
vertices_rank = vertices.withColumn(
    "rank", F.when(F.col("id") == seed, 1.0).otherwise(0.0)
)

# 5. Iteratively update the PageRank values.
for i in range(maxIter):
    # 5a. For each edge, compute the contribution from its source.
    #     Join the edges with the current vertex ranks and the out-degrees.
    contribs = (
        edges.join(vertices_rank, edges.src == vertices_rank.id)
             .join(out_degrees, edges.src == out_degrees.src)
             .select(
                 edges.dst.alias("id"),
                 (vertices_rank.rank / out_degrees.out_degree).alias("contrib")
             )
    )
    
    # 5b. Sum the contributions arriving at each vertex.
    contribs_sum = contribs.groupBy("id").agg(F.sum("contrib").alias("sum_contrib"))
    
    # 5c. Compute the total rank from dangling nodes (vertices with no outgoing edges).
    dangling = (
        vertices_rank.join(out_degrees, vertices_rank.id == out_degrees.src, "left")
                     .withColumn("out_degree", F.coalesce(F.col("out_degree"), F.lit(0)))
                     .where(F.col("out_degree") == 0)
    )
    dangling_sum = dangling.agg(F.sum("rank").alias("dangling_sum")).collect()[0]["dangling_sum"]
    if dangling_sum is None:
        dangling_sum = 0.0
    
    # 5d. Update each vertex's rank:
    #     - If the vertex is the seed, it gets the reset term (alpha)
    #       plus (1 - alpha) times (its incoming contributions plus dangling rank).
    #     - Otherwise, it just gets (1 - alpha) times its incoming contributions.
    vertices_rank = (
        vertices.join(contribs_sum, on="id", how="left")
                .na.fill({"sum_contrib": 0.0})
                .withColumn("rank", 
                    F.when(F.col("id") == seed,
                           alpha + (1 - alpha) * (F.col("sum_contrib") + dangling_sum)
                    ).otherwise(
                           (1 - alpha) * F.col("sum_contrib")
                    )
                )
    )

# 6. Display the final personalized PageRank values.
vertices_rank.show()


# Barabasi-Albert Graph

### Took 49 min to run on 2 Cores (DBC Edition generic nodes)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark import RDD
import random

# Initialize Spark session
spark = SparkSession.builder \
    .appName("BA Graph Generator") \
    .getOrCreate()

def generate_five_tuple(ba_col_a1, ba_col_b1):
    """
    Generate five-tuple data for an edge between nodes.
    Returns a tuple
    """
    ba_col_a2 = random.randint(1024, 65535)
    ba_col_b2 = random.randint(1024, 65535)
    ba_col_c = random.choice(["A", "B"])
    return (ba_col_a1, ba_col_b1, ba_col_a2, ba_col_b2, ba_col_c)

def generate_ba_graph_rdd(total_nodes, m):
    """
    Generate the BA graph using RDDs for parallelism.
    Each edge is generated in parallel across the cluster.

    - total_nodes: Total number of nodes in the graph.
    - m: Number of edges each new node will attach to.
    
    Returns an RDD of edges.
    Each edge is represented as a tuple:
      (ba_col_a1, ba_col_b1, ba_col_a2, ba_col_b2, ba_col_c)
    """
    # Step 1: Create an initial complete graph among the first m nodes.
    initial_nodes = list(range(m))
    initial_edges = []

    for i in range(m):
        for j in range(i + 1, m):
            five_tuple = generate_five_tuple(i, j)
            initial_edges.append((five_tuple[0], five_tuple[1], five_tuple[2], five_tuple[3], five_tuple[4]))

    # Step 2: Create an RDD from initial edges
    edges_rdd = spark.sparkContext.parallelize(initial_edges)

    # Step 3: Maintain a list of nodes repeated by their degree (using RDD).
    repeated_nodes = initial_nodes * (m - 1)
    
    # Broadcast the repeated nodes so each worker can access it
    repeated_nodes_broadcast = spark.sparkContext.broadcast(repeated_nodes)

    # Step 4: Generate new edges for each new node in parallel
    def generate_edges_for_new_node(new_node):
        targets = set()
        # Sample m unique target nodes with probability proportional to degree.
        while len(targets) < m:
            target = random.choice(repeated_nodes_broadcast.value)  # Use the broadcasted value
            targets.add(target)
        new_edges = []
        for target in targets:
            five_tuple = generate_five_tuple(new_node, target)
            new_edges.append((five_tuple[0], five_tuple[1], five_tuple[2], five_tuple[3], five_tuple[4]))
        return new_edges

    # Step 5: Generate edges for all new nodes from m to total_nodes
    new_edges_rdd = spark.sparkContext.parallelize(range(m, total_nodes)) \
        .flatMap(generate_edges_for_new_node)

    # Combine initial edges and newly generated edges
    return edges_rdd.union(new_edges_rdd)

if __name__ == "__main__":

    # Parameters for BA graph
    total_nodes = 100_000_000   # Adjust as needed
    m = 3                       # Each new node connects to m existing nodes

    # Generate the BA graph edges as an RDD
    edges_rdd = generate_ba_graph_rdd(total_nodes, m)

    # Define a schema for the edges DataFrame
    schema = StructType([
        StructField("ba_col_a1", StringType(), False),
        StructField("ba_col_b1", StringType(), False),
        StructField("ba_col_a2", IntegerType(), False),
        StructField("ba_col_b2", IntegerType(), False),
        StructField("ba_col_c", StringType(), False)
    ])

    # Convert the RDD to a DataFrame
    edges_df = spark.createDataFrame(edges_rdd, schema)
    edges_df.cache()

    # Show a few rows of the generated graph
    edges_df.show(10, truncate=False)

    # Optionally, write the graph data to disk (e.g., CSV)
    # edges_df.write.csv("ba_graph_output.csv", header=True)
