In [4]:
import os
from pyspark.sql import SparkSession
from sklearn.decomposition import TruncatedSVD
from sklearn.cluster import MiniBatchKMeans, KMeans
from scipy.sparse import csr_matrix, vstack
import numpy as np
import joblib

from sklearn.random_projection import SparseRandomProjection
from sklearn.neighbors import NearestNeighbors

from scipy.sparse import vstack as sp_vstack

from pyspark.sql import functions as F
from pyspark.sql.types import BinaryType, IntegerType, StructType, StructField, ArrayType, DoubleType

from pyspark.ml.linalg import SparseVector, DenseVector

import pandas as pd

In [33]:
def spark_session():
    # Stop any old session so new configs take effect in notebooks
    return (
        SparkSession.builder
        .appName("MySQL_to_Delta_on_MinIO")
        .master("spark://spark-master:7077")
        .config("spark.jars.packages",
                ",".join([
                    # Delta
                    "io.delta:delta-spark_2.12:3.1.0",
                    # MySQL JDBC
                    "mysql:mysql-connector-java:8.0.33",
                    # S3A / MinIO (versions must match your Hadoop)
                    "org.apache.hadoop:hadoop-aws:3.3.2",
                    "com.amazonaws:aws-java-sdk-bundle:1.11.1026",
                ]))
        # Delta integration
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        # MinIO (S3A) configs
        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
        .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
        .config("spark.hadoop.fs.s3a.path.style.access", "true")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
        .config("spark.ui.port", "4040")                 # fix the port
        .config("spark.driver.bindAddress", "0.0.0.0")   # listen on all ifaces
        .config("spark.driver.host", "jupyter")          # OR "spark-master" – the container's DNS name
        .config("spark.ui.showConsoleProgress", "true")
        # Resources
        # .config("spark.executor.cores", "2")
        # .config("spark.executor.memory", "2g")
        # .config("spark.executor.memoryOverhead", "1536m")
        # .config("spark.network.timeout", "600s")
        .config("spark.executor.cores", "2")           # 1 task per executor (more stable for trees)
        .config("spark.executor.memory", "3g")
        .config("spark.executor.memoryOverhead", "1g")  # or omit in Standalone
        .config("spark.sql.shuffle.partitions", "50")
        .config("spark.local.dir", "/mnt/spark-tmp/local") # For giving it much more space to run CV
        .config("spark.network.timeout", "600s")
        .getOrCreate()
    )

spark = spark_session()

In [34]:
# --- Spark session (reuse yours)
spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
GOLD = os.getenv("GOLD_PATH","s3a://deltabucket/gold/wholeCorp_delta")
OUT = os.getenv("CLUSTER_PATH","s3a://deltabucket/gold/wholeCorp_clusters")

data = spark.read.format("delta").load(GOLD).select("features")
data_whole = spark.read.format("delta").load(GOLD)

# --- get feature size from one row
first_vec = data.limit(1).collect()[0][0]
num_features = int(first_vec.size)

# --- helper: Spark Row -> CSR batch
def to_csr(rows):
    indptr = [0]; indices = []; vals = []
    for r in rows:
        sv = r["features"]
        indices.extend(sv.indices.tolist())
        vals.extend(sv.values.tolist())
        indptr.append(indptr[-1] + len(sv.indices))
    return csr_matrix((np.array(vals, dtype=np.float64),
                       np.array(indices, dtype=np.int32),
                       np.array(indptr, dtype=np.int32)),
                      shape=(len(rows), num_features
))

# # Define a UDF to extract values
# def extract_indices(vector):
#     if isinstance(vector, SparseVector):
#         return vector.indices.tolist()
#     return None

# def extract_values(vector):
#     if isinstance(vector, SparseVector):
#         return vector.values.tolist()
#     return None
    
# extract_indices_udf = F.udf(extract_indices, ArrayType(IntegerType()))
# extract_values_udf = F.udf(extract_values, ArrayType(DoubleType()))

# # Apply the UDF to extract the values as a list
# df_with_values = data.withColumn("indices_list", extract_indices_udf("features"))
# df_with_values = df_with_values.withColumn("values_list", extract_values_udf("features"))

# 1) collect a manageable sample from Spark
sample_rows = []
for i, row in enumerate(data.sample(False, 0.02, seed=42).toLocalIterator()):  # ~2% example
    sample_rows.append(row)
    if i >= 20000:      # cap by count if you like
        break

X_sample = to_csr(sample_rows)            # CSR (n_sample, num_features)
svd = TruncatedSVD(n_components=100, random_state=42).fit(X_sample)

# --- Stage 2: fit MiniBatchKMeans on reduced features
kmeans = MiniBatchKMeans(n_clusters=15,
                         random_state=42,
                         batch_size=2000,
                         verbose=1,
                         n_init='auto')
chunks = []
batch = []
for row in data.toLocalIterator():
    batch.append(row)
    if len(batch) >= 2000:
        Xb = to_csr(batch)
        Xr = svd.transform(Xb)            # reduced batch
        kmeans.partial_fit(Xr)
        chunks.append(Xr)
        batch.clear()
if batch:
    Xb = to_csr(batch); Xr = svd.transform(Xb); kmeans.partial_fit(Xr)

# spark.stop()

[MiniBatchKMeans] Reassigning 12 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.
[MiniBatchKMeans] Reassigning 1 cluster centers.


In [31]:
data_whole = spark.read.format("delta").load(GOLD).select("features")
data_whole

DataFrame[features: vector]

In [25]:
data_raw = spark.read.format("delta").load(GOLD).select("features")

In [39]:
data.show()

+--------------------+
|            features|
+--------------------+
|(262147,[63958,26...|
|(262147,[63958,26...|
|(262147,[7956,262...|
|(262147,[80719,26...|
|(262147,[63958,26...|
|(262147,[117068,2...|
|(262147,[171092,2...|
|(262147,[256426,2...|
|(262147,[63958,26...|
|(262147,[171092,2...|
|(262147,[63958,26...|
|(262147,[63958,26...|
|(262147,[3201,262...|
|(262147,[247690,2...|
|(262147,[76079,26...|
|(262147,[76079,26...|
|(262147,[240732,2...|
|(262147,[63958,26...|
|(262147,[98664,26...|
|(262147,[36506,26...|
+--------------------+
only showing top 20 rows



In [48]:
BATCH = 2000
rows_buf, ids_buf = [], []

# (Re)create output table empty (optional)
# spark.sql(f"DROP TABLE IF EXISTS delta.`{OUT}`")  # only if you want a fresh table

for r in data_whole.toLocalIterator():
    rows_buf.append(r)
    ids_buf.append(r["統一編號"])
    if len(rows_buf) >= BATCH:
        Xb = to_csr(rows_buf)
        Xr = svd.transform(Xb)
        yb = kmeans.predict(Xr).astype(int)

        pdf = pd.DataFrame({"統一編號": ids_buf, "cluster": yb})
        spark.createDataFrame(pdf).write.format("delta").mode("append").save(OUT)

        rows_buf.clear(); ids_buf.clear()

In [49]:
if rows_buf:
    Xb = to_csr(rows_buf)
    Xr = svd.transform(Xb)
    yb = kmeans.predict(Xr).astype(int)
    pdf = pd.DataFrame({"統一編號": ids_buf, "cluster": yb})
    spark.createDataFrame(pdf).write.format("delta").mode("append").save(OUT)

In [53]:
df_cluster = spark.read.format("delta").load('s3a://deltabucket/gold/wholeCorp_clusters')

In [14]:
whole_Xr = svd.transform(Xb)

In [None]:
OUT = os.getenv("CLUSTER_PATH","s3a://deltabucket/gold/wholeCorp_clusters_2")

In [None]:
import cloudpickle, pandas as pd, numpy as np
from scipy.sparse import csr_matrix
from pyspark.sql import functions as F, types as T
from pyspark.ml.linalg import SparseVector

# serialize models (TruncatedSVD / SRP + MiniBatchKMeans)
proj_blob   = cloudpickle.dumps(svd)      # or srp
kmeans_blob = cloudpickle.dumps(kmeans)

schema = T.StructType([
  T.StructField("統一編號", T.StringType(), False),
  T.StructField("cluster",  T.IntegerType(), True),
])

@F.pandas_udf(schema, "MAP_ITER")
def predict_clusters(it):
    import cloudpickle, numpy as np
    from scipy.sparse import csr_matrix
    proj   = cloudpickle.loads(proj_blob)
    kmeans = cloudpickle.loads(kmeans_blob)
    for pdf in it:
        ids  = pdf["統一編號"].tolist()
        vecs = pdf["features"].tolist()

        # build CSR for this partition
        indptr, indices, data = [0], [], []
        size = vecs[0].size if vecs else 0
        for v in vecs:
            if isinstance(v, SparseVector):
                indices.extend(v.indices); data.extend(v.values)
                indptr.append(indptr[-1] + len(v.indices))
            else:
                arr = np.asarray(v.toArray()); nz = arr.nonzero()[0]
                indices.extend(nz); data.extend(arr[nz]); indptr.append(indptr[-1]+len(nz))
        X = csr_matrix((np.array(data, float), np.array(indices, int), np.array(indptr, int)),
                       shape=(len(vecs), size))

        Z = proj.transform(X)             # SVD/SRP transform in bulk
        y = kmeans.predict(Z).astype(int)
        yield pd.DataFrame({"統一編號": ids, "cluster": y})

full = (spark.read.format("delta").load(GOLD)
        .select("統一編號","features")
        .repartition(200))                 # tune to cluster size

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
pred = full.mapInPandas(predict_clusters, schema)

(pred
 .repartition(64)                          # avoid many small files
 .write.format("delta")
 .mode("overwrite")
 .option("overwriteSchema","true")
 .save(OUT))


In [71]:
len(kmeans.labels_)

1786

In [66]:
from sklearn.metrics import silhouette_score

wcss = []
for k in range(2,20):
    km = KMeans(n_clusters=k, random_state=42, n_init='auto').fit(Xr)
    km.fit(Xr)
    wcss.append(km.inertia_)
    
    score = silhouette_score(Xr, km.labels_)
    print(f"k={k}, silhouette={score:0.3f}")
    

k=2, silhouette=0.979
k=3, silhouette=0.926
k=4, silhouette=0.923
k=5, silhouette=0.927
k=6, silhouette=0.929
k=7, silhouette=0.932
k=8, silhouette=0.947
k=9, silhouette=0.956
k=10, silhouette=0.957
k=11, silhouette=0.957
k=12, silhouette=0.960
k=13, silhouette=0.961
k=14, silhouette=0.960
k=15, silhouette=0.962
k=16, silhouette=0.960
k=17, silhouette=0.963
k=18, silhouette=0.963
k=19, silhouette=0.963


In [None]:
from collections import Counter

In [None]:
Counter(kmeans.predict(Xr).tolist())

In [None]:
if batch:
    Xb = to_csr(batch); chunks.append(svd.transform(Xb))
    
# ... after you fill `chunks` with SRP-transformed CSR batches:
X_all = sp_vstack(chunks, format="csr")   # stack sparsely, stays CSR

# nn = NearestNeighbors(n_neighbors=10, metric="cosine", algorithm="brute").fit(X_all)
nn = NearestNeighbors(n_neighbors=10, metric="cosine").fit(X_all)

# --- Example: query nearest neighbors for company i=123
distances, indices = nn.kneighbors(X_all[123].reshape(1,-1))
print("Nearest neighbor indices:", indices, "distances:", distances)

In [19]:
# After you build X_all (CSR)
nnz = X_all.getnnz(axis=1)              # number of nonzeros per row
print("Zero rows:", int((nnz==0).sum()))

# If you’re querying row i:
i = 123
print("Query nnz:", int(X_all[i].getnnz()))

NameError: name 'X_all' is not defined

In [25]:
print("Cluster centers shape:", kmeans.cluster_centers_.shape)

# --- Save both
joblib.dump(svd, "svd_100.joblib")
joblib.dump(kmeans, "kmeans_10.joblib")

Cluster centers shape: (10, 100)


['kmeans_10.joblib']